You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/09/15 07:06:29 UTC
[pulsar] 02/02: [fix][cpp] Support retry and apply operation timeout for lookup requests (#17410)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 590d91f4a70a67415b0ccd232c3294fcec80af5a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Sep 15 14:38:02 2022 +0800
[fix][cpp] Support retry and apply operation timeout for lookup requests (#17410)
### Motivation
Currently the operation timeout only works for requests other than
lookup, like SEND and FLOW. However, the lookup requests, which are sent
by `LookupService`, should also apply the operation timeout, see
https://github.com/apache/pulsar/blob/7075a5ce0d4a70f52625ac8c3d0c48894442b72a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java#L1019-L1025
In addition, no attempts would be retried if lookup failed even due to a
retryable reason. For example, if some of all configured brokers were
down, the C++ client would fail immediately.
Therefore, this PR intends to retry lookup for some certain cases:
- The connection cannot be established, except connection timeout, which
is controlled by the connection timeout.
- A `ServiceNotReady` error is received, except it's caused by
`PulsarServerException`, e.g. the listener name is wrong.
Then, apply the operation timeout to avoid infinite retries.
### Modifications
- Add a `ResultRetryable` error code, which should only be used
internally. Complete the futures with this error in the cases said
previously.
- Add a `RetryableLookupService` implementation to support retries based
on the backoff policy. Replace the directly usages of `LookupService`
implementations with this class in `ClientImpl`.
The following tests are added:
- `ClientTest.testMultiBrokerUrl`: verify when multiple brokers are
configured, even if one of them is not available, the creation of
producer or consumer could still succeed.
- `LookupService.testRetry`: verify all lookup methods could be retried.
- `LookupService.testTimeout`: verify all lookup methods could be
completed with `ResultTimeout` if no brokers are available.
### TODO
In future, we should add lookup timeout instead of operation timeout for
lookup requests and separate lookup connection pool, see PIP-91.
* Handle ResultRetryable in handleDisconnection
(cherry picked from commit 86ea31b2409a7b62624278dc133d443ca62ffb76)
---
pulsar-client-cpp/include/pulsar/Result.h | 3 +-
pulsar-client-cpp/lib/BinaryProtoLookupService.cc | 3 +-
pulsar-client-cpp/lib/ClientConnection.cc | 54 ++++----
pulsar-client-cpp/lib/ClientImpl.cc | 14 +-
pulsar-client-cpp/lib/ClientImpl.h | 2 +-
pulsar-client-cpp/lib/HTTPLookupService.cc | 3 +
pulsar-client-cpp/lib/HandlerBase.cc | 14 +-
pulsar-client-cpp/lib/Result.cc | 3 +
pulsar-client-cpp/lib/RetryableLookupService.h | 151 ++++++++++++++++++++++
pulsar-client-cpp/lib/ServiceNameResolver.h | 2 +
pulsar-client-cpp/lib/SynchronizedHashMap.h | 8 +-
pulsar-client-cpp/lib/Utils.h | 2 +
pulsar-client-cpp/tests/ClientTest.cc | 29 ++++-
pulsar-client-cpp/tests/LookupServiceTest.cc | 95 +++++++++++++-
pulsar-client-cpp/tests/PulsarFriend.h | 11 ++
15 files changed, 348 insertions(+), 46 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h
index a7fdd04f99a..cc7b457528e 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -29,7 +29,8 @@ namespace pulsar {
*/
enum Result
{
- ResultOk, /// Operation successful
+ ResultRetryable = -1, /// An internal error code used for retry
+ ResultOk = 0, /// Operation successful
ResultUnknownError, /// Unknown error happened on broker
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index bff29237c71..ff42b91b3f7 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -111,7 +111,6 @@ void BinaryProtoLookupService::sendPartitionMetadataLookupRequest(const std::str
LookupDataResultPromisePtr promise) {
if (result != ResultOk) {
promise->setFailed(result);
- Future<Result, LookupDataResultPtr> future = promise->getFuture();
return;
}
LookupDataResultPromisePtr lookupPromise = std::make_shared<LookupDataResultPromise>();
@@ -160,7 +159,7 @@ void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const std::string
const ClientConnectionWeakPtr& clientCnx,
NamespaceTopicsPromisePtr promise) {
if (result != ResultOk) {
- promise->setFailed(ResultConnectError);
+ promise->setFailed(result);
return;
}
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index e6cab07cb9b..45733248c1e 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -51,7 +51,7 @@ static const uint32_t DefaultBufferSize = 64 * 1024;
static const int KeepAliveIntervalInSeconds = 30;
// Convert error codes from protobuf to client API Result
-static Result getResult(ServerError serverError) {
+static Result getResult(ServerError serverError, const std::string& message) {
switch (serverError) {
case UnknownError:
return ResultUnknownError;
@@ -75,7 +75,9 @@ static Result getResult(ServerError serverError) {
return ResultConsumerBusy;
case ServiceNotReady:
- return ResultServiceUnitNotReady;
+ // If the error is not caused by a PulsarServerException, treat it as retryable.
+ return (message.find("PulsarServerException") == std::string::npos) ? ResultRetryable
+ : ResultServiceUnitNotReady;
case ProducerBlockedQuotaExceededError:
return ResultProducerBlockedQuotaExceededError;
@@ -138,7 +140,7 @@ static Result getResult(ServerError serverError) {
}
inline std::ostream& operator<<(std::ostream& os, ServerError error) {
- os << getResult(error);
+ os << getResult(error, "");
return os;
}
@@ -182,7 +184,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
consumerStatsRequestTimer_ = executor_->createDeadlineTimer();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to initialize connection: " << e.what());
- close();
+ close(ResultRetryable);
return;
}
@@ -368,7 +370,7 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
cnxString_ = cnxStringStream.str();
} catch (const boost::system::system_error& e) {
LOG_ERROR("Failed to get endpoint: " << e.what());
- close();
+ close(ResultRetryable);
return;
}
if (logicalAddress_ == physicalAddress_) {
@@ -450,11 +452,16 @@ void ClientConnection::handleTcpConnected(const boost::system::error_code& err,
std::bind(&ClientConnection::handleTcpConnected, shared_from_this(),
std::placeholders::_1, ++endpointIterator));
} else {
- close();
+ if (err == boost::asio::error::operation_aborted) {
+ // TCP connect timeout, which is not retryable
+ close();
+ } else {
+ close(ResultRetryable);
+ }
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
- close();
+ close(ResultRetryable);
}
}
@@ -913,7 +920,8 @@ void ClientConnection::handleIncomingCommand() {
<< " error: " << partitionMetadataResponse.error()
<< " msg: " << partitionMetadataResponse.message());
checkServerError(partitionMetadataResponse.error());
- lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error()));
+ lookupDataPromise->setFailed(getResult(partitionMetadataResponse.error(),
+ partitionMetadataResponse.message()));
} else {
LOG_ERROR(cnxString_ << "Failed partition-metadata lookup req_id: "
<< partitionMetadataResponse.request_id()
@@ -952,7 +960,8 @@ void ClientConnection::handleIncomingCommand() {
LOG_ERROR(cnxString_ << " Failed to get consumer stats - "
<< consumerStatsResponse.error_message());
}
- consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code()));
+ consumerStatsPromise.setFailed(getResult(consumerStatsResponse.error_code(),
+ consumerStatsResponse.error_message()));
} else {
LOG_DEBUG(cnxString_ << "ConsumerStatsResponse command - Received consumer stats "
"response from server. req_id: "
@@ -1000,7 +1009,8 @@ void ClientConnection::handleIncomingCommand() {
<< " error: " << lookupTopicResponse.error()
<< " msg: " << lookupTopicResponse.message());
checkServerError(lookupTopicResponse.error());
- lookupDataPromise->setFailed(getResult(lookupTopicResponse.error()));
+ lookupDataPromise->setFailed(
+ getResult(lookupTopicResponse.error(), lookupTopicResponse.message()));
} else {
LOG_ERROR(cnxString_
<< "Failed lookup req_id: " << lookupTopicResponse.request_id()
@@ -1067,7 +1077,7 @@ void ClientConnection::handleIncomingCommand() {
case BaseCommand::ERROR: {
const CommandError& error = incomingCmd_.error();
- Result result = getResult(error.error());
+ Result result = getResult(error.error(), error.message());
LOG_WARN(cnxString_ << "Received error response from server: " << result
<< (error.has_message() ? (" (" + error.message() + ")") : "")
<< " -- req_id: " << error.request_id());
@@ -1080,7 +1090,7 @@ void ClientConnection::handleIncomingCommand() {
pendingRequests_.erase(it);
lock.unlock();
- requestData.promise.setFailed(getResult(error.error()));
+ requestData.promise.setFailed(result);
requestData.timer->cancel();
} else {
PendingGetLastMessageIdRequestsMap::iterator it =
@@ -1090,7 +1100,7 @@ void ClientConnection::handleIncomingCommand() {
pendingGetLastMessageIdRequests_.erase(it);
lock.unlock();
- getLastMessageIdPromise.setFailed(getResult(error.error()));
+ getLastMessageIdPromise.setFailed(result);
} else {
PendingGetNamespaceTopicsMap::iterator it =
pendingGetNamespaceTopicsRequests_.find(error.request_id());
@@ -1099,7 +1109,7 @@ void ClientConnection::handleIncomingCommand() {
pendingGetNamespaceTopicsRequests_.erase(it);
lock.unlock();
- getNamespaceTopicsPromise.setFailed(getResult(error.error()));
+ getNamespaceTopicsPromise.setFailed(result);
} else {
lock.unlock();
}
@@ -1557,34 +1567,34 @@ void ClientConnection::close(Result result) {
}
lock.unlock();
- LOG_INFO(cnxString_ << "Connection closed");
+ LOG_INFO(cnxString_ << "Connection closed with " << result);
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
- HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
+ HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}
for (ConsumersMap::iterator it = consumers.begin(); it != consumers.end(); ++it) {
- HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
+ HandlerBase::handleDisconnection(result, shared_from_this(), it->second);
}
connectPromise_.setFailed(result);
// Fail all pending requests, all these type are map whose value type contains the Promise object
for (auto& kv : pendingRequests) {
- kv.second.promise.setFailed(ResultConnectError);
+ kv.second.promise.setFailed(result);
}
for (auto& kv : pendingLookupRequests) {
- kv.second.promise->setFailed(ResultConnectError);
+ kv.second.promise->setFailed(result);
}
for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
- kv.second.setFailed(ResultConnectError);
+ kv.second.setFailed(result);
}
for (auto& kv : pendingGetLastMessageIdRequests) {
- kv.second.setFailed(ResultConnectError);
+ kv.second.setFailed(result);
}
for (auto& kv : pendingGetNamespaceTopicsRequests) {
- kv.second.setFailed(ResultConnectError);
+ kv.second.setFailed(result);
}
}
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 2c983421e67..08adb1d6423 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -30,7 +30,9 @@
#include <boost/algorithm/string/predicate.hpp>
#include <sstream>
#include <stdexcept>
+#include <lib/BinaryProtoLookupService.h>
#include <lib/HTTPLookupService.h>
+#include <lib/RetryableLookupService.h>
#include <lib/TopicName.h>
#include <algorithm>
#include <random>
@@ -106,17 +108,21 @@ ClientImpl::ClientImpl(const std::string& serviceUrl, const ClientConfiguration&
}
LogUtils::setLoggerFactory(std::move(loggerFactory));
+ LookupServicePtr underlyingLookupServicePtr;
if (serviceNameResolver_.useHttp()) {
LOG_DEBUG("Using HTTP Lookup");
- lookupServicePtr_ = std::make_shared<HTTPLookupService>(std::ref(serviceNameResolver_),
- std::cref(clientConfiguration_),
- std::cref(clientConfiguration_.getAuthPtr()));
+ underlyingLookupServicePtr = std::make_shared<HTTPLookupService>(
+ std::ref(serviceNameResolver_), std::cref(clientConfiguration_),
+ std::cref(clientConfiguration_.getAuthPtr()));
} else {
LOG_DEBUG("Using Binary Lookup");
- lookupServicePtr_ =
+ underlyingLookupServicePtr =
std::make_shared<BinaryProtoLookupService>(std::ref(serviceNameResolver_), std::ref(pool_),
std::cref(clientConfiguration_.getListenerName()));
}
+
+ lookupServicePtr_ = RetryableLookupService::create(
+ underlyingLookupServicePtr, clientConfiguration_.getOperationTimeoutSeconds(), ioExecutorProvider_);
}
ClientImpl::~ClientImpl() { shutdown(); }
diff --git a/pulsar-client-cpp/lib/ClientImpl.h b/pulsar-client-cpp/lib/ClientImpl.h
index 1f046e2427c..466461ae71e 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -21,7 +21,7 @@
#include <pulsar/Client.h>
#include "ExecutorService.h"
-#include "BinaryProtoLookupService.h"
+#include "LookupService.h"
#include "MemoryLimitController.h"
#include "ConnectionPool.h"
#include "LookupDataResult.h"
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 5b986632392..61392666a93 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -265,6 +265,9 @@ Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &
}
break;
case CURLE_COULDNT_CONNECT:
+ LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
+ retResult = ResultRetryable;
+ break;
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index 5d2244f7552..506207ea132 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -105,6 +105,11 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
handler->connection_.reset();
+ if (result == ResultRetryable) {
+ scheduleReconnection(handler);
+ return;
+ }
+
switch (state) {
case Pending:
case Ready:
@@ -121,14 +126,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
}
}
-bool HandlerBase::isRetriableError(Result result) {
- switch (result) {
- case ResultServiceUnitNotReady:
- return true;
- default:
- return false;
- }
-}
+bool HandlerBase::isRetriableError(Result result) { return result == ResultRetryable; }
void HandlerBase::scheduleReconnection(HandlerBasePtr handler) {
const auto state = handler->state_.load();
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index d3322aa3805..6682341b2a3 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -25,6 +25,9 @@ namespace pulsar {
const char* strResult(Result result) {
switch (result) {
+ case ResultRetryable:
+ return "Retryable";
+
case ResultOk:
return "Ok";
diff --git a/pulsar-client-cpp/lib/RetryableLookupService.h b/pulsar-client-cpp/lib/RetryableLookupService.h
new file mode 100644
index 00000000000..a8f7bfcec3b
--- /dev/null
+++ b/pulsar-client-cpp/lib/RetryableLookupService.h
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#pragma once
+
+#include <algorithm>
+#include <memory>
+#include "lib/Backoff.h"
+#include "lib/ExecutorService.h"
+#include "lib/LookupService.h"
+#include "lib/SynchronizedHashMap.h"
+#include "lib/LogUtils.h"
+
+namespace pulsar {
+
+class RetryableLookupService : public LookupService,
+ public std::enable_shared_from_this<RetryableLookupService> {
+ private:
+ friend class PulsarFriend;
+ struct PassKey {
+ explicit PassKey() {}
+ };
+
+ public:
+ template <typename... Args>
+ explicit RetryableLookupService(PassKey, Args&&... args)
+ : RetryableLookupService(std::forward<Args>(args)...) {}
+
+ template <typename... Args>
+ static std::shared_ptr<RetryableLookupService> create(Args&&... args) {
+ return std::make_shared<RetryableLookupService>(PassKey{}, std::forward<Args>(args)...);
+ }
+
+ LookupResultFuture getBroker(const TopicName& topicName) override {
+ return executeAsync<LookupResult>("get-broker-" + topicName.toString(),
+ [this, topicName] { return lookupService_->getBroker(topicName); });
+ }
+
+ Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const TopicNamePtr& topicName) override {
+ return executeAsync<LookupDataResultPtr>(
+ "get-partition-metadata-" + topicName->toString(),
+ [this, topicName] { return lookupService_->getPartitionMetadataAsync(topicName); });
+ }
+
+ Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const NamespaceNamePtr& nsName) override {
+ return executeAsync<NamespaceTopicsPtr>(
+ "get-topics-of-namespace-" + nsName->toString(),
+ [this, nsName] { return lookupService_->getTopicsOfNamespaceAsync(nsName); });
+ }
+
+ template <typename T>
+ Future<Result, T> executeAsync(const std::string& key, std::function<Future<Result, T>()> f) {
+ Promise<Result, T> promise;
+ executeAsyncImpl(key, f, promise, timeout_);
+ return promise.getFuture();
+ }
+
+ private:
+ const std::shared_ptr<LookupService> lookupService_;
+ const TimeDuration timeout_;
+ Backoff backoff_;
+ const ExecutorServiceProviderPtr executorProvider_;
+
+ using Timer = boost::asio::deadline_timer;
+ using TimerPtr = std::unique_ptr<Timer>;
+ SynchronizedHashMap<std::string, TimerPtr> backoffTimers_;
+
+ RetryableLookupService(std::shared_ptr<LookupService> lookupService, int timeoutSeconds,
+ ExecutorServiceProviderPtr executorProvider)
+ : lookupService_(lookupService),
+ timeout_(boost::posix_time::seconds(timeoutSeconds)),
+ backoff_(boost::posix_time::milliseconds(100), timeout_ + timeout_,
+ boost::posix_time::milliseconds(0)),
+ executorProvider_(executorProvider) {}
+
+ std::weak_ptr<RetryableLookupService> weak_from_this() noexcept { return shared_from_this(); }
+
+ // NOTE: Set the visibility to fix compilation error in GCC 6
+ template <typename T>
+#ifndef _WIN32
+ __attribute__((visibility("hidden")))
+#endif
+ void
+ executeAsyncImpl(const std::string& key, std::function<Future<Result, T>()> f, Promise<Result, T> promise,
+ TimeDuration remainingTime) {
+ auto weakSelf = weak_from_this();
+ f().addListener([this, weakSelf, key, f, promise, remainingTime](Result result, const T& value) {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+
+ if (result == ResultOk) {
+ backoffTimers_.remove(key);
+ promise.setValue(value);
+ } else if (result == ResultRetryable) {
+ if (remainingTime.total_milliseconds() <= 0) {
+ backoffTimers_.remove(key);
+ promise.setFailed(ResultTimeout);
+ return;
+ }
+
+ auto it = backoffTimers_.emplace(
+ key, TimerPtr{new Timer(executorProvider_->get()->getIOService())});
+ auto& timer = *(it.first->second);
+ auto delay = std::min(backoff_.next(), remainingTime);
+ timer.expires_from_now(delay);
+
+ auto nextRemainingTime = remainingTime - delay;
+ LOG_INFO("Reschedule " << key << " for " << delay.total_milliseconds()
+ << " ms, remaining time: " << nextRemainingTime.total_milliseconds()
+ << " ms");
+ timer.async_wait([this, weakSelf, key, f, promise,
+ nextRemainingTime](const boost::system::error_code& ec) {
+ auto self = weakSelf.lock();
+ if (!self || ec) {
+ if (self && ec != boost::asio::error::operation_aborted) {
+ LOG_ERROR("The timer for " << key << " failed: " << ec.message());
+ }
+ // The lookup service has been destructed or the timer has been cancelled
+ promise.setFailed(ResultTimeout);
+ return;
+ }
+ executeAsyncImpl(key, f, promise, nextRemainingTime);
+ });
+ } else {
+ backoffTimers_.remove(key);
+ promise.setFailed(result);
+ }
+ });
+ }
+
+ DECLARE_LOG_OBJECT()
+};
+
+} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ServiceNameResolver.h b/pulsar-client-cpp/lib/ServiceNameResolver.h
index 60351d8037f..cf7a5832697 100644
--- a/pulsar-client-cpp/lib/ServiceNameResolver.h
+++ b/pulsar-client-cpp/lib/ServiceNameResolver.h
@@ -52,6 +52,8 @@ class ServiceNameResolver {
const ServiceURI serviceUri_;
const size_t numAddresses_;
std::atomic_size_t index_{0};
+
+ friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/SynchronizedHashMap.h b/pulsar-client-cpp/lib/SynchronizedHashMap.h
index 184ca6a2836..831d1e83bbd 100644
--- a/pulsar-client-cpp/lib/SynchronizedHashMap.h
+++ b/pulsar-client-cpp/lib/SynchronizedHashMap.h
@@ -36,6 +36,8 @@ class SynchronizedHashMap {
public:
using OptValue = Optional<V>;
using PairVector = std::vector<std::pair<K, V>>;
+ using MapType = std::unordered_map<K, V>;
+ using Iterator = typename MapType::iterator;
SynchronizedHashMap() = default;
@@ -46,9 +48,9 @@ class SynchronizedHashMap {
}
template <typename... Args>
- void emplace(Args&&... args) {
+ std::pair<Iterator, bool> emplace(Args&&... args) {
Lock lock(mutex_);
- data_.emplace(std::forward<Args>(args)...);
+ return data_.emplace(std::forward<Args>(args)...);
}
void forEach(std::function<void(const K&, const V&)> f) const {
@@ -105,7 +107,7 @@ class SynchronizedHashMap {
Lock lock(mutex_);
auto it = data_.find(key);
if (it != data_.end()) {
- auto result = OptValue::of(it->second);
+ auto result = OptValue::of(std::move(it->second));
data_.erase(it);
return result;
} else {
diff --git a/pulsar-client-cpp/lib/Utils.h b/pulsar-client-cpp/lib/Utils.h
index e662ecf08f4..b0f500ef0ac 100644
--- a/pulsar-client-cpp/lib/Utils.h
+++ b/pulsar-client-cpp/lib/Utils.h
@@ -85,6 +85,7 @@ class Optional {
* Create an Optional with the bound value
*/
static Optional<T> of(const T& value) { return Optional<T>(value); }
+ static Optional<T> of(T&& value) { return Optional<T>(std::move(value)); }
/**
* Create an empty optional
@@ -95,6 +96,7 @@ class Optional {
private:
Optional(const T& value) : value_(value), present_(true) {}
+ Optional(T&& value) : value_(std::move(value)), present_(true) {}
T value_;
bool present_;
diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc
index 58c889f074a..9270a61afd4 100644
--- a/pulsar-client-cpp/tests/ClientTest.cc
+++ b/pulsar-client-cpp/tests/ClientTest.cc
@@ -97,14 +97,14 @@ TEST(ClientTest, testSwHwChecksum) {
TEST(ClientTest, testServerConnectError) {
const std::string topic = "test-server-connect-error";
- Client client("pulsar://localhost:65535");
+ Client client("pulsar://localhost:65535", ClientConfiguration().setOperationTimeoutSeconds(1));
Producer producer;
- ASSERT_EQ(ResultConnectError, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultTimeout, client.createProducer(topic, producer));
Consumer consumer;
- ASSERT_EQ(ResultConnectError, client.subscribe(topic, "sub", consumer));
+ ASSERT_EQ(ResultTimeout, client.subscribe(topic, "sub", consumer));
Reader reader;
ReaderConfiguration readerConf;
- ASSERT_EQ(ResultConnectError, client.createReader(topic, MessageId::earliest(), readerConf, reader));
+ ASSERT_EQ(ResultTimeout, client.createReader(topic, MessageId::earliest(), readerConf, reader));
client.close();
}
@@ -133,6 +133,9 @@ TEST(ClientTest, testConnectTimeout) {
clientLow.close();
clientDefault.close();
+
+ ASSERT_EQ(futureDefault.wait_for(std::chrono::milliseconds(10)), std::future_status::ready);
+ ASSERT_EQ(futureDefault.get(), ResultConnectError);
}
TEST(ClientTest, testGetNumberOfReferences) {
@@ -281,3 +284,21 @@ TEST(ClientTest, testWrongListener) {
ASSERT_EQ(PulsarFriend::getConsumers(client).size(), 0);
ASSERT_EQ(ResultOk, client.close());
}
+
+TEST(ClientTest, testMultiBrokerUrl) {
+ const std::string topic = "client-test-multi-broker-url-" + std::to_string(time(nullptr));
+ Client client("pulsar://localhost:6000,localhost"); // the 1st address is not reachable
+
+ Producer producer;
+ PulsarFriend::setServiceUrlIndex(client, 0);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+
+ Consumer consumer;
+ PulsarFriend::setServiceUrlIndex(client, 0);
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
+
+ Reader reader;
+ PulsarFriend::setServiceUrlIndex(client, 0);
+ ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
+ client.close();
+}
diff --git a/pulsar-client-cpp/tests/LookupServiceTest.cc b/pulsar-client-cpp/tests/LookupServiceTest.cc
index 14c5695a828..77c1e1aaef2 100644
--- a/pulsar-client-cpp/tests/LookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/LookupServiceTest.cc
@@ -28,6 +28,8 @@
#include <pulsar/Authentication.h>
#include <boost/exception/all.hpp>
#include "LogUtils.h"
+#include "RetryableLookupService.h"
+#include "PulsarFriend.h"
#include <algorithm>
@@ -135,7 +137,7 @@ static void testMultiAddresses(LookupService& lookupService) {
auto verifySuccessCount = [&results] {
// Only half of them succeeded
ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2);
- ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultConnectError), numRequests / 2);
+ ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2);
};
for (int i = 0; i < numRequests; i++) {
@@ -179,3 +181,94 @@ TEST(LookupServiceTest, testMultiAddresses) {
std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
testMultiAddresses(*httpLookupServicePtr);
}
+TEST(LookupServiceTest, testRetry) {
+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
+ ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
+
+ auto lookupService = RetryableLookupService::create(
+ std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), 30 /* seconds */,
+ executorProvider);
+
+ PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+ auto topicNamePtr = TopicName::get("lookup-service-test-retry");
+ auto future1 = lookupService->getBroker(*topicNamePtr);
+ LookupService::LookupResult lookupResult;
+ ASSERT_EQ(ResultOk, future1.get(lookupResult));
+ LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress
+ << ", physicalAddress: " << lookupResult.physicalAddress);
+
+ PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+ auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
+ LookupDataResultPtr lookupDataResultPtr;
+ ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
+ LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions");
+
+ PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
+ auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+ NamespaceTopicsPtr namespaceTopicsPtr;
+ ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
+ LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics");
+
+ std::atomic_int retryCount{0};
+ constexpr int totalRetryCount = 3;
+ auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() -> Future<Result, int> {
+ Promise<Result, int> promise;
+ if (++retryCount < totalRetryCount) {
+ LOG_INFO("Retry count: " << retryCount);
+ promise.setFailed(ResultRetryable);
+ } else {
+ LOG_INFO("Retry done with " << retryCount << " times");
+ promise.setValue(100);
+ }
+ return promise.getFuture();
+ });
+ int customResult = 0;
+ ASSERT_EQ(ResultOk, future4.get(customResult));
+ ASSERT_EQ(customResult, 100);
+ ASSERT_EQ(retryCount.load(), totalRetryCount);
+
+ ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+}
+
+TEST(LookupServiceTest, testTimeout) {
+ auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
+ ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
+ ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
+
+ constexpr int timeoutInSeconds = 2;
+ auto lookupService = RetryableLookupService::create(
+ std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), timeoutInSeconds,
+ executorProvider);
+ auto topicNamePtr = TopicName::get("lookup-service-test-retry");
+
+ decltype(std::chrono::high_resolution_clock::now()) startTime;
+ auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); };
+ auto afterMethod = [&startTime](const std::string& name) {
+ auto timeInterval = std::chrono::duration_cast<std::chrono::milliseconds>(
+ std::chrono::high_resolution_clock::now() - startTime)
+ .count();
+ LOG_INFO(name << " took " << timeInterval << " seconds");
+ ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L);
+ };
+
+ beforeMethod();
+ auto future1 = lookupService->getBroker(*topicNamePtr);
+ LookupService::LookupResult lookupResult;
+ ASSERT_EQ(ResultTimeout, future1.get(lookupResult));
+ afterMethod("getBroker");
+
+ beforeMethod();
+ auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
+ LookupDataResultPtr lookupDataResultPtr;
+ ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
+ afterMethod("getPartitionMetadataAsync");
+
+ beforeMethod();
+ auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
+ NamespaceTopicsPtr namespaceTopicsPtr;
+ ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
+ afterMethod("getTopicsOfNamespaceAsync");
+
+ ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
+}
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index b6fb219eabb..d9f9923c7ce 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -25,6 +25,7 @@
#include "lib/ConsumerImpl.h"
#include "lib/MultiTopicsConsumerImpl.h"
#include "lib/ReaderImpl.h"
+#include "lib/RetryableLookupService.h"
using std::string;
@@ -118,5 +119,15 @@ class PulsarFriend {
static boost::posix_time::ptime& getFirstBackoffTime(Backoff& backoff) {
return backoff.firstBackoffTime_;
}
+
+ static void setServiceUrlIndex(ServiceNameResolver& resolver, size_t index) { resolver.index_ = index; }
+
+ static void setServiceUrlIndex(const Client& client, size_t index) {
+ setServiceUrlIndex(client.impl_->serviceNameResolver_, index);
+ }
+
+ static size_t getNumberOfPendingTasks(const RetryableLookupService& lookupService) {
+ return lookupService.backoffTimers_.size();
+ }
};
} // namespace pulsar