You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/15 07:06:29 UTC

[pulsar] 02/02: [fix][cpp] Support retry and apply operation timeout for lookup requests (#17410)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 590d91f4a70a67415b0ccd232c3294fcec80af5a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 15 14:38:02 2022 +0800

    [fix][cpp] Support retry and apply operation timeout for lookup requests (#17410)
    
    ### Motivation
    
    Currently the operation timeout only works for requests other than
    lookup, like SEND and FLOW. However, the lookup requests, which are sent
    by `LookupService`, should also apply the operation timeout, see
    
    https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L1019-L1025
    
    In addition, no attempts would be retried if lookup failed even due to a
    retryable reason. For example, if some of all configured brokers were
    down, the C++ client would fail immediately.
    
    Therefore, this PR intends to retry lookup for some certain cases:
    - The connection cannot be established, except connection timeout, which
      is controlled by the connection timeout.
    - A `ServiceNotReady` error is received, except it's caused by
      `PulsarServerException`, e.g. the listener name is wrong.
    
    Then, apply the operation timeout to avoid infinite retries.
    
    ### Modifications
    
    - Add a `ResultRetryable` error code, which should only be used
      internally. Complete the futures with this error in the cases said
      previously.
    - Add a `RetryableLookupService` implementation to support retries based
      on the backoff policy. Replace the directly usages of `LookupService`
      implementations with this class in `ClientImpl`.
    
    The following tests are added:
    - `ClientTest.testMultiBrokerUrl`: verify when multiple brokers are
      configured, even if one of them is not available, the creation of
      producer or consumer could still succeed.
    - `LookupService.testRetry`: verify all lookup methods could be retried.
    - `LookupService.testTimeout`: verify all lookup methods could be
      completed with `ResultTimeout` if no brokers are available.
    
    ### TODO
    
    In future, we should add lookup timeout instead of operation timeout for
    lookup requests and separate lookup connection pool, see PIP-91.
    
    * Handle ResultRetryable in handleDisconnection
    
    (cherry picked from commit 86ea31b2409a7b62624278dc133d443ca62ffb76)
---
 pulsar-client-cpp/include/pulsar/Result.h         |   3 +-
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc |   3 +-
 pulsar-client-cpp/lib/ClientConnection.cc         |  54 ++++----
 pulsar-client-cpp/lib/ClientImpl.cc               |  14 +-
 pulsar-client-cpp/lib/ClientImpl.h                |   2 +-
 pulsar-client-cpp/lib/HTTPLookupService.cc        |   3 +
 pulsar-client-cpp/lib/HandlerBase.cc              |  14 +-
 pulsar-client-cpp/lib/Result.cc                   |   3 +
 pulsar-client-cpp/lib/RetryableLookupService.h    | 151 ++++++++++++++++++++++
 pulsar-client-cpp/lib/ServiceNameResolver.h       |   2 +
 pulsar-client-cpp/lib/SynchronizedHashMap.h       |   8 +-
 pulsar-client-cpp/lib/Utils.h                     |   2 +
 pulsar-client-cpp/tests/ClientTest.cc             |  29 ++++-
 pulsar-client-cpp/tests/LookupServiceTest.cc      |  95 +++++++++++++-
 pulsar-client-cpp/tests/PulsarFriend.h            |  11 ++
 15 files changed, 348 insertions(+), 46 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index a7fdd04f99a..cc7b457528e 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -29,7 +29,8 @@ namespace pulsar {
  */
 enum Result
 {
-    ResultOk,  /// Operation successful
+    ResultRetryable = -1,  /// An internal error code used for retry
+    ResultOk = 0,          /// Operation successful
 
     ResultUnknownError,  /// Unknown error happened on broker
 
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index bff29237c71..ff42b91b3f7 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -111,7 +111,6 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
                                                                   LookupDataResultPromisePtr promise) {
     if (result != ResultOk) {
         promise->setFailed(result);
-        Future<Result, LookupDataResultPtr> future = promise->getFuture();
         return;
     }
     LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
@@ -160,7 +159,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
                                                                const ClientConnectionWeakPtr& clientCnx,
                                                                NamespaceTopicsPromisePtr promise) {
     if (result != ResultOk) {
-        promise->setFailed(ResultConnectError);
+        promise->setFailed(result);
         return;
     }
 
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index e6cab07cb9b..45733248c1e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -51,7 +51,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
 static const int KeepAliveIntervalInSeconds = 30;
 
 // Convert error codes from protobuf to client API Result
-static Result getResult(ServerError serverError) {
+static Result getResult(ServerError serverError, const std::string& message) {
     switch (serverError) {
         case UnknownError:
             return ResultUnknownError;
@@ -75,7 +75,9 @@ static Result getResult(ServerError serverError) {
             return ResultConsumerBusy;
 
         case ServiceNotReady:
-            return ResultServiceUnitNotReady;
+            // If the error is not caused by a PulsarServerException, treat it as retryable.
+            return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable
+                                                                                : ResultServiceUnitNotReady;
 
         case ProducerBlockedQuotaExceededError:
             return ResultProducerBlockedQuotaExceededError;
@@ -138,7 +140,7 @@ static Result getResult(ServerError serverError) {
 }
 
 inline std::ostream& operator<<(std::ostream& os, ServerError error) {
-    os << getResult(error);
+    os << getResult(error, "");
     return os;
 }
 
@@ -182,7 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
         consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
     } catch (const boost::system::system_error& e) {
         LOG_ERROR("Failed to initialize connection: " << e.what());
-        close();
+        close(ResultRetryable);
         return;
     }
 
@@ -368,7 +370,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
             cnxString_ = cnxStringStream.str();
         } catch (const boost::system::system_error& e) {
             LOG_ERROR("Failed to get endpoint: " << e.what());
-            close();
+            close(ResultRetryable);
             return;
         }
         if (logicalAddress_ == physicalAddress_) {
@@ -450,11 +452,16 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
                                    std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
                                              std::placeholders::_1, ++endpointIterator));
         } else {
-            close();
+            if (err == boost::asio::error::operation_aborted) {
+                // TCP connect timeout, which is not retryable
+                close();
+            } else {
+                close(ResultRetryable);
+            }
         }
     } else {
         LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
-        close();
+        close(ResultRetryable);
     }
 }
 
@@ -913,7 +920,8 @@ void ClientConnection::handleIncomingCommand() {
                                                      << " error: " << partitionMetadataResponse.error()
                                                      << " msg: " << partitionMetadataResponse.message());
                                 checkServerError(partitionMetadataResponse.error());
-                                lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
+                                lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(),
+                                                                       partitionMetadataResponse.message()));
                             } else {
                                 LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
                                                      << partitionMetadataResponse.request_id()
@@ -952,7 +960,8 @@ void ClientConnection::handleIncomingCommand() {
                                 LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
                                                      << consumerStatsResponse.error_message());
                             }
-                            consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code()));
+                            consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(),
+                                                                     consumerStatsResponse.error_message()));
                         } else {
                             LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
                                                     "response from server. req_id: "
@@ -1000,7 +1009,8 @@ void ClientConnection::handleIncomingCommand() {
                                           << " error: " << lookupTopicResponse.error()
                                           << " msg: " << lookupTopicResponse.message());
                                 checkServerError(lookupTopicResponse.error());
-                                lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
+                                lookupDataPromise->setFailed(
+                                    getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
                             } else {
                                 LOG_ERROR(cnxString_
                                           << "Failed lookup req_id: " << lookupTopicResponse.request_id()
@@ -1067,7 +1077,7 @@ void ClientConnection::handleIncomingCommand() {
 
                 case BaseCommand::ERROR: {
                     const CommandError& error = incomingCmd_.error();
-                    Result result = getResult(error.error());
+                    Result result = getResult(error.error(), error.message());
                     LOG_WARN(cnxString_ << "Received error response from server: " << result
                                         << (error.has_message() ? (" (" + error.message() + ")") : "")
                                         << " -- req_id: " << error.request_id());
@@ -1080,7 +1090,7 @@ void ClientConnection::handleIncomingCommand() {
                         pendingRequests_.erase(it);
                         lock.unlock();
 
-                        requestData.promise.setFailed(getResult(error.error()));
+                        requestData.promise.setFailed(result);
                         requestData.timer->cancel();
                     } else {
                         PendingGetLastMessageIdRequestsMap::iterator it =
@@ -1090,7 +1100,7 @@ void ClientConnection::handleIncomingCommand() {
                             pendingGetLastMessageIdRequests_.erase(it);
                             lock.unlock();
 
-                            getLastMessageIdPromise.setFailed(getResult(error.error()));
+                            getLastMessageIdPromise.setFailed(result);
                         } else {
                             PendingGetNamespaceTopicsMap::iterator it =
                                 pendingGetNamespaceTopicsRequests_.find(error.request_id());
@@ -1099,7 +1109,7 @@ void ClientConnection::handleIncomingCommand() {
                                 pendingGetNamespaceTopicsRequests_.erase(it);
                                 lock.unlock();
 
-                                getNamespaceTopicsPromise.setFailed(getResult(error.error()));
+                                getNamespaceTopicsPromise.setFailed(result);
                             } else {
                                 lock.unlock();
                             }
@@ -1557,34 +1567,34 @@ void ClientConnection::close(Result result) {
     }
 
     lock.unlock();
-    LOG_INFO(cnxString_ << "Connection closed");
+    LOG_INFO(cnxString_ << "Connection closed with " << result);
 
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
-        HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
+        HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
     }
 
     for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
-        HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
+        HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
     }
 
     connectPromise_.setFailed(result);
 
     // Fail all pending requests, all these type are map whose value type contains the Promise object
     for (auto& kv : pendingRequests) {
-        kv.second.promise.setFailed(ResultConnectError);
+        kv.second.promise.setFailed(result);
     }
     for (auto& kv : pendingLookupRequests) {
-        kv.second.promise->setFailed(ResultConnectError);
+        kv.second.promise->setFailed(result);
     }
     for (auto& kv : pendingConsumerStatsMap) {
         LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
-        kv.second.setFailed(ResultConnectError);
+        kv.second.setFailed(result);
     }
     for (auto& kv : pendingGetLastMessageIdRequests) {
-        kv.second.setFailed(ResultConnectError);
+        kv.second.setFailed(result);
     }
     for (auto& kv : pendingGetNamespaceTopicsRequests) {
-        kv.second.setFailed(ResultConnectError);
+        kv.second.setFailed(result);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 2c983421e67..08adb1d6423 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -30,7 +30,9 @@
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
 #include <stdexcept>
+#include <lib/BinaryProtoLookupService.h>
 #include <lib/HTTPLookupService.h>
+#include <lib/RetryableLookupService.h>
 #include <lib/TopicName.h>
 #include <algorithm>
 #include <random>
@@ -106,17 +108,21 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
     }
     LogUtils::setLoggerFactory(std::move(loggerFactory));
 
+    LookupServicePtr underlyingLookupServicePtr;
     if (serviceNameResolver_.useHttp()) {
         LOG_DEBUG("Using HTTP Lookup");
-        lookupServicePtr_ = std::make_shared<HTTPLookupService>(std::ref(serviceNameResolver_),
-                                                                std::cref(clientConfiguration_),
-                                                                std::cref(clientConfiguration_.getAuthPtr()));
+        underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
+            std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
+            std::cref(clientConfiguration_.getAuthPtr()));
     } else {
         LOG_DEBUG("Using Binary Lookup");
-        lookupServicePtr_ =
+        underlyingLookupServicePtr =
             std::make_shared<BinaryProtoLookupService>(std::ref(serviceNameResolver_), std::ref(pool_),
                                                        std::cref(clientConfiguration_.getListenerName()));
     }
+
+    lookupServicePtr_ = RetryableLookupService::create(
+        underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
 }
 
 ClientImpl::~ClientImpl() { shutdown(); }
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 1f046e2427c..466461ae71e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -21,7 +21,7 @@
 
 #include <pulsar/Client.h>
 #include "ExecutorService.h"
-#include "BinaryProtoLookupService.h"
+#include "LookupService.h"
 #include "MemoryLimitController.h"
 #include "ConnectionPool.h"
 #include "LookupDataResult.h"
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 5b986632392..61392666a93 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -265,6 +265,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
                 }
                 break;
             case CURLE_COULDNT_CONNECT:
+                LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
+                retResult = ResultRetryable;
+                break;
             case CURLE_COULDNT_RESOLVE_PROXY:
             case CURLE_COULDNT_RESOLVE_HOST:
             case CURLE_HTTP_RETURNED_ERROR:
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index 5d2244f7552..506207ea132 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -105,6 +105,11 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
 
     handler->connection_.reset();
 
+    if (result == ResultRetryable) {
+        scheduleReconnection(handler);
+        return;
+    }
+
     switch (state) {
         case Pending:
         case Ready:
@@ -121,14 +126,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
     }
 }
 
-bool HandlerBase::isRetriableError(Result result) {
-    switch (result) {
-        case ResultServiceUnitNotReady:
-            return true;
-        default:
-            return false;
-    }
-}
+bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; }
 
 void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
     const auto state = handler->state_.load();
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index d3322aa3805..6682341b2a3 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -25,6 +25,9 @@ namespace pulsar {
 
 const char* strResult(Result result) {
     switch (result) {
+        case ResultRetryable:
+            return "Retryable";
+
         case ResultOk:
             return "Ok";
 
diff --git a/pulsar-client-cpp/lib/RetryableLookupService.h b/pulsar-client-cpp/lib/RetryableLookupService.h
new file mode 100644
index 00000000000..a8f7bfcec3b
--- /dev/null
+++ b/pulsar-client-cpp/lib/RetryableLookupService.h
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include "lib/Backoff.h"
+#include "lib/ExecutorService.h"
+#include "lib/LookupService.h"
+#include "lib/SynchronizedHashMap.h"
+#include "lib/LogUtils.h"
+
+namespace pulsar {
+
+class RetryableLookupService : public LookupService,
+                               public std::enable_shared_from_this<RetryableLookupService> {
+   private:
+    friend class PulsarFriend;
+    struct PassKey {
+        explicit PassKey() {}
+    };
+
+   public:
+    template <typename... Args>
+    explicit RetryableLookupService(PassKey, Args&&... args)
+        : RetryableLookupService(std::forward<Args>(args)...) {}
+
+    template <typename... Args>
+    static std::shared_ptr<RetryableLookupService> create(Args&&... args) {
+        return std::make_shared<RetryableLookupService>(PassKey{}, std::forward<Args>(args)...);
+    }
+
+    LookupResultFuture getBroker(const TopicName& topicName) override {
+        return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
+                                          [this, topicName] { return lookupService_->getBroker(topicName); });
+    }
+
+    Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
+        return executeAsync<LookupDataResultPtr>(
+            "get-partition-metadata-" + topicName->toString(),
+            [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); });
+    }
+
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override {
+        return executeAsync<NamespaceTopicsPtr>(
+            "get-topics-of-namespace-" + nsName->toString(),
+            [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); });
+    }
+
+    template <typename T>
+    Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) {
+        Promise<Result, T> promise;
+        executeAsyncImpl(key, f, promise, timeout_);
+        return promise.getFuture();
+    }
+
+   private:
+    const std::shared_ptr<LookupService> lookupService_;
+    const TimeDuration timeout_;
+    Backoff backoff_;
+    const ExecutorServiceProviderPtr executorProvider_;
+
+    using Timer = boost::asio::deadline_timer;
+    using TimerPtr = std::unique_ptr<Timer>;
+    SynchronizedHashMap<std::string, TimerPtr> backoffTimers_;
+
+    RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
+                           ExecutorServiceProviderPtr executorProvider)
+        : lookupService_(lookupService),
+          timeout_(boost::posix_time::seconds(timeoutSeconds)),
+          backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
+                   boost::posix_time::milliseconds(0)),
+          executorProvider_(executorProvider) {}
+
+    std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return shared_from_this(); }
+
+    // NOTE: Set the visibility to fix compilation error in GCC 6
+    template <typename T>
+#ifndef _WIN32
+    __attribute__((visibility("hidden")))
+#endif
+    void
+    executeAsyncImpl(const std::string& key, std::function<Future<Result, T>()> f, Promise<Result, T> promise,
+                     TimeDuration remainingTime) {
+        auto weakSelf = weak_from_this();
+        f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) {
+            auto self = weakSelf.lock();
+            if (!self) {
+                return;
+            }
+
+            if (result == ResultOk) {
+                backoffTimers_.remove(key);
+                promise.setValue(value);
+            } else if (result == ResultRetryable) {
+                if (remainingTime.total_milliseconds() <= 0) {
+                    backoffTimers_.remove(key);
+                    promise.setFailed(ResultTimeout);
+                    return;
+                }
+
+                auto it = backoffTimers_.emplace(
+                    key, TimerPtr{new Timer(executorProvider_->get()->getIOService())});
+                auto& timer = *(it.first->second);
+                auto delay = std::min(backoff_.next(), remainingTime);
+                timer.expires_from_now(delay);
+
+                auto nextRemainingTime = remainingTime - delay;
+                LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds()
+                                       << " ms, remaining time: " << nextRemainingTime.total_milliseconds()
+                                       << " ms");
+                timer.async_wait([this, weakSelf, key, f, promise,
+                                  nextRemainingTime](const boost::system::error_code& ec) {
+                    auto self = weakSelf.lock();
+                    if (!self || ec) {
+                        if (self && ec != boost::asio::error::operation_aborted) {
+                            LOG_ERROR("The timer for " << key << " failed: " << ec.message());
+                        }
+                        // The lookup service has been destructed or the timer has been cancelled
+                        promise.setFailed(ResultTimeout);
+                        return;
+                    }
+                    executeAsyncImpl(key, f, promise, nextRemainingTime);
+                });
+            } else {
+                backoffTimers_.remove(key);
+                promise.setFailed(result);
+            }
+        });
+    }
+
+    DECLARE_LOG_OBJECT()
+};
+
+}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ServiceNameResolver.h b/pulsar-client-cpp/lib/ServiceNameResolver.h
index 60351d8037f..cf7a5832697 100644
--- a/pulsar-client-cpp/lib/ServiceNameResolver.h
+++ b/pulsar-client-cpp/lib/ServiceNameResolver.h
@@ -52,6 +52,8 @@ class ServiceNameResolver {
     const ServiceURI serviceUri_;
     const size_t numAddresses_;
     std::atomic_size_t index_{0};
+
+    friend class PulsarFriend;
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 184ca6a2836..831d1e83bbd 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -36,6 +36,8 @@ class SynchronizedHashMap {
    public:
     using OptValue = Optional<V>;
     using PairVector = std::vector<std::pair<K, V>>;
+    using MapType = std::unordered_map<K, V>;
+    using Iterator = typename MapType::iterator;
 
     SynchronizedHashMap() = default;
 
@@ -46,9 +48,9 @@ class SynchronizedHashMap {
     }
 
     template <typename... Args>
-    void emplace(Args&&... args) {
+    std::pair<Iterator, bool> emplace(Args&&... args) {
         Lock lock(mutex_);
-        data_.emplace(std::forward<Args>(args)...);
+        return data_.emplace(std::forward<Args>(args)...);
     }
 
     void forEach(std::function<void(const K&, const V&)> f) const {
@@ -105,7 +107,7 @@ class SynchronizedHashMap {
         Lock lock(mutex_);
         auto it = data_.find(key);
         if (it != data_.end()) {
-            auto result = OptValue::of(it->second);
+            auto result = OptValue::of(std::move(it->second));
             data_.erase(it);
             return result;
         } else {
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index e662ecf08f4..b0f500ef0ac 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -85,6 +85,7 @@ class Optional {
      * Create an Optional with the bound value
      */
     static Optional<T> of(const T& value) { return Optional<T>(value); }
+    static Optional<T> of(T&& value) { return Optional<T>(std::move(value)); }
 
     /**
      * Create an empty optional
@@ -95,6 +96,7 @@ class Optional {
 
    private:
     Optional(const T& value) : value_(value), present_(true) {}
+    Optional(T&& value) : value_(std::move(value)), present_(true) {}
 
     T value_;
     bool present_;
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 58c889f074a..9270a61afd4 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -97,14 +97,14 @@ TEST(ClientTest, testSwHwChecksum) {
 
 TEST(ClientTest, testServerConnectError) {
     const std::string topic = "test-server-connect-error";
-    Client client("pulsar://localhost:65535");
+    Client client("pulsar://localhost:65535", ClientConfiguration().setOperationTimeoutSeconds(1));
     Producer producer;
-    ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultTimeout, client.createProducer(topic, producer));
     Consumer consumer;
-    ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
+    ASSERT_EQ(ResultTimeout, client.subscribe(topic, "sub", consumer));
     Reader reader;
     ReaderConfiguration readerConf;
-    ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+    ASSERT_EQ(ResultTimeout, client.createReader(topic, MessageId::earliest(), readerConf, reader));
     client.close();
 }
 
@@ -133,6 +133,9 @@ TEST(ClientTest, testConnectTimeout) {
 
     clientLow.close();
     clientDefault.close();
+
+    ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::ready);
+    ASSERT_EQ(futureDefault.get(), ResultConnectError);
 }
 
 TEST(ClientTest, testGetNumberOfReferences) {
@@ -281,3 +284,21 @@ TEST(ClientTest, testWrongListener) {
     ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
     ASSERT_EQ(ResultOk, client.close());
 }
+
+TEST(ClientTest, testMultiBrokerUrl) {
+    const std::string topic = "client-test-multi-broker-url-" + std::to_string(time(nullptr));
+    Client client("pulsar://localhost:6000,localhost");  // the 1st address is not reachable
+
+    Producer producer;
+    PulsarFriend::setServiceUrlIndex(client, 0);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+    Consumer consumer;
+    PulsarFriend::setServiceUrlIndex(client, 0);
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+    Reader reader;
+    PulsarFriend::setServiceUrlIndex(client, 0);
+    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
+    client.close();
+}
diff --git a/pulsar-client-cpp/tests/LookupServiceTest.cc b/pulsar-client-cpp/tests/LookupServiceTest.cc
index 14c5695a828..77c1e1aaef2 100644
--- a/pulsar-client-cpp/tests/LookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/LookupServiceTest.cc
@@ -28,6 +28,8 @@
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
 #include "LogUtils.h"
+#include "RetryableLookupService.h"
+#include "PulsarFriend.h"
 
 #include <algorithm>
 
@@ -135,7 +137,7 @@ static void testMultiAddresses(LookupService& lookupService) {
     auto verifySuccessCount = [&results] {
         // Only half of them succeeded
         ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2);
-        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultConnectError), numRequests / 2);
+        ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2);
     };
 
     for (int i = 0; i < numRequests; i++) {
@@ -179,3 +181,94 @@ TEST(LookupServiceTest, testMultiAddresses) {
         std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
     testMultiAddresses(*httpLookupServicePtr);
 }
+TEST(LookupServiceTest, testRetry) {
+    auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
+    ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
+
+    auto lookupService = RetryableLookupService::create(
+        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), 30 /* seconds */,
+        executorProvider);
+
+    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+    auto topicNamePtr = TopicName::get("lookup-service-test-retry");
+    auto future1 = lookupService->getBroker(*topicNamePtr);
+    LookupService::LookupResult lookupResult;
+    ASSERT_EQ(ResultOk, future1.get(lookupResult));
+    LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress
+                                                  << ", physicalAddress: " << lookupResult.physicalAddress);
+
+    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+    auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
+    LookupDataResultPtr lookupDataResultPtr;
+    ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
+    LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions");
+
+    PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+    auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+    NamespaceTopicsPtr namespaceTopicsPtr;
+    ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
+    LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics");
+
+    std::atomic_int retryCount{0};
+    constexpr int totalRetryCount = 3;
+    auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() -> Future<Result, int> {
+        Promise<Result, int> promise;
+        if (++retryCount < totalRetryCount) {
+            LOG_INFO("Retry count: " << retryCount);
+            promise.setFailed(ResultRetryable);
+        } else {
+            LOG_INFO("Retry done with " << retryCount << " times");
+            promise.setValue(100);
+        }
+        return promise.getFuture();
+    });
+    int customResult = 0;
+    ASSERT_EQ(ResultOk, future4.get(customResult));
+    ASSERT_EQ(customResult, 100);
+    ASSERT_EQ(retryCount.load(), totalRetryCount);
+
+    ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+}
+
+TEST(LookupServiceTest, testTimeout) {
+    auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+    ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
+    ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
+
+    constexpr int timeoutInSeconds = 2;
+    auto lookupService = RetryableLookupService::create(
+        std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), timeoutInSeconds,
+        executorProvider);
+    auto topicNamePtr = TopicName::get("lookup-service-test-retry");
+
+    decltype(std::chrono::high_resolution_clock::now()) startTime;
+    auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); };
+    auto afterMethod = [&startTime](const std::string& name) {
+        auto timeInterval = std::chrono::duration_cast<std::chrono::milliseconds>(
+                                std::chrono::high_resolution_clock::now() - startTime)
+                                .count();
+        LOG_INFO(name << " took " << timeInterval << " seconds");
+        ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L);
+    };
+
+    beforeMethod();
+    auto future1 = lookupService->getBroker(*topicNamePtr);
+    LookupService::LookupResult lookupResult;
+    ASSERT_EQ(ResultTimeout, future1.get(lookupResult));
+    afterMethod("getBroker");
+
+    beforeMethod();
+    auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
+    LookupDataResultPtr lookupDataResultPtr;
+    ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
+    afterMethod("getPartitionMetadataAsync");
+
+    beforeMethod();
+    auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+    NamespaceTopicsPtr namespaceTopicsPtr;
+    ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
+    afterMethod("getTopicsOfNamespaceAsync");
+
+    ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index b6fb219eabb..d9f9923c7ce 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -25,6 +25,7 @@
 #include "lib/ConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
 #include "lib/ReaderImpl.h"
+#include "lib/RetryableLookupService.h"
 
 using std::string;
 
@@ -118,5 +119,15 @@ class PulsarFriend {
     static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
         return backoff.firstBackoffTime_;
     }
+
+    static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; }
+
+    static void setServiceUrlIndex(const Client& client, size_t index) {
+        setServiceUrlIndex(client.impl_->serviceNameResolver_, index);
+    }
+
+    static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) {
+        return lookupService.backoffTimers_.size();
+    }
 };
 }  // namespace pulsar