You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/12 07:16:52 UTC
[pulsar] branch branch-2.6 updated: [C++] Fix some pending requests
may never complete when broker's down (#8232)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new bf83b1b [C++] Fix some pending requests may never complete when broker's down (#8232)
bf83b1b is described below
commit bf83b1b29dcb77a740b27d9ab2369fdf78b1d4b4
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Oct 12 15:14:39 2020 +0800
[C++] Fix some pending requests may never complete when broker's down (#8232)
Fixes #8230
### Motivation
When `ClientConnection` calls `close`, some promises of pending request were not completed. It may cause some methods that wait for promise become completed never return, like `Reader::hasMessageAvailable`.
### Modifications
- Complete `GetLastMessageId` and `GetNamespaceTopics` requests in `ClientConnection::close`.
- Refactor `ClientConnection::close`, use move instead of swap to process some fields outside the lock section.
- Add error log before connection was closed by read error.
### Verifying this change
Try to reproduce #8230 , `Reader::hasMessageAvailable` would return `ResultConnectError` if broker was killed by force.
(cherry picked from commit 5e60775fcf607be202641a45166c5d8781e3de87)
---
pulsar-client-cpp/lib/ClientConnection.cc | 72 +++++++++++++++----------------
1 file changed, 35 insertions(+), 37 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index ebb7268..6394d25 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -498,6 +498,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
incomingBuffer_.bytesWritten(bytesTransferred);
if (err || bytesTransferred == 0) {
+ if (err) {
+ LOG_ERROR(cnxString_ << "Read failed: " << err.message());
+ } // else: bytesTransferred == 0, which means server has closed the connection
close();
} else if (bytesTransferred < minReadSize) {
// Read the remaining part, use a slice of buffer to write on the next
@@ -1378,28 +1381,39 @@ void ClientConnection::close() {
state_ = Disconnected;
boost::system::error_code err;
socket_->close(err);
- ConsumersMap consumers;
- consumers.swap(consumers_);
- ProducersMap producers;
- producers.swap(producers_);
- lock.unlock();
- LOG_INFO(cnxString_ << "Connection closed");
+ if (tlsSocket_) {
+ tlsSocket_->lowest_layer().close();
+ }
+
+ if (executor_) {
+ executor_.reset();
+ }
+
+ // Move the internal fields to process them after `mutex_` was unlocked
+ auto consumers = std::move(consumers_);
+ auto producers = std::move(producers_);
+ auto pendingRequests = std::move(pendingRequests_);
+ auto pendingLookupRequests = std::move(pendingLookupRequests_);
+ auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
+ auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
+ auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
+
+ numOfPendingLookupRequest_ = 0;
if (keepAliveTimer_) {
- lock.lock();
keepAliveTimer_->cancel();
keepAliveTimer_.reset();
- lock.unlock();
}
if (consumerStatsRequestTimer_) {
- lock.lock();
consumerStatsRequestTimer_->cancel();
consumerStatsRequestTimer_.reset();
- lock.unlock();
}
+ lock.unlock();
+ LOG_INFO(cnxString_ << "Connection closed");
+
for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
}
@@ -1410,38 +1424,22 @@ void ClientConnection::close() {
connectPromise_.setFailed(ResultConnectError);
- // Fail all pending operations on the connection
- for (PendingRequestsMap::iterator it = pendingRequests_.begin(); it != pendingRequests_.end(); ++it) {
- it->second.promise.setFailed(ResultConnectError);
+ // Fail all pending requests, all these type are map whose value type contains the Promise object
+ for (auto& kv : pendingRequests) {
+ kv.second.promise.setFailed(ResultConnectError);
}
-
- // Fail all pending lookup-requests on the connection
- lock.lock();
- PendingLookupRequestsMap pendingLookupRequests;
- pendingLookupRequests_.swap(pendingLookupRequests);
- numOfPendingLookupRequest_ -= pendingLookupRequests.size();
-
- PendingConsumerStatsMap pendingConsumerStatsMap;
- pendingConsumerStatsMap_.swap(pendingConsumerStatsMap);
- lock.unlock();
-
- for (PendingLookupRequestsMap::iterator it = pendingLookupRequests.begin();
- it != pendingLookupRequests.end(); ++it) {
- it->second.promise->setFailed(ResultConnectError);
+ for (auto& kv : pendingLookupRequests) {
+ kv.second.promise->setFailed(ResultConnectError);
}
-
- for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap.begin();
- it != pendingConsumerStatsMap.end(); ++it) {
+ for (auto& kv : pendingConsumerStatsMap) {
LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
- it->second.setFailed(ResultConnectError);
+ kv.second.setFailed(ResultConnectError);
}
-
- if (tlsSocket_) {
- tlsSocket_->lowest_layer().close();
+ for (auto& kv : pendingGetLastMessageIdRequests) {
+ kv.second.setFailed(ResultConnectError);
}
-
- if (executor_) {
- executor_.reset();
+ for (auto& kv : pendingGetNamespaceTopicsRequests) {
+ kv.second.setFailed(ResultConnectError);
}
}