You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/12 07:16:52 UTC

[pulsar] branch branch-2.6 updated: [C++] Fix some pending requests may never complete when broker's down (#8232)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new bf83b1b  [C++] Fix some pending requests may never complete when broker's down (#8232)
bf83b1b is described below

commit bf83b1b29dcb77a740b27d9ab2369fdf78b1d4b4
Author: Yunze Xu <xy...@163.com>
AuthorDate: Mon Oct 12 15:14:39 2020 +0800

    [C++] Fix some pending requests may never complete when broker's down (#8232)
    
    Fixes #8230
    
    ### Motivation
    
    When `ClientConnection` calls `close`, some promises of pending request were not completed. It may cause some methods that wait for promise become completed never return, like `Reader::hasMessageAvailable`.
    
    ### Modifications
    
    - Complete `GetLastMessageId` and `GetNamespaceTopics` requests in `ClientConnection::close`.
    - Refactor `ClientConnection::close`, use move instead of swap to process some fields outside the lock section.
    - Add error log before connection was closed by read error.
    
    ### Verifying this change
    
    Try to reproduce #8230 , `Reader::hasMessageAvailable` would return `ResultConnectError` if broker was killed by force.
    (cherry picked from commit 5e60775fcf607be202641a45166c5d8781e3de87)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 72 +++++++++++++++----------------
 1 file changed, 35 insertions(+), 37 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index ebb7268..6394d25 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -498,6 +498,9 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
     incomingBuffer_.bytesWritten(bytesTransferred);
 
     if (err || bytesTransferred == 0) {
+        if (err) {
+            LOG_ERROR(cnxString_ << "Read failed: " << err.message());
+        }  // else: bytesTransferred == 0, which means server has closed the connection
         close();
     } else if (bytesTransferred < minReadSize) {
         // Read the remaining part, use a slice of buffer to write on the next
@@ -1378,28 +1381,39 @@ void ClientConnection::close() {
     state_ = Disconnected;
     boost::system::error_code err;
     socket_->close(err);
-    ConsumersMap consumers;
-    consumers.swap(consumers_);
-    ProducersMap producers;
-    producers.swap(producers_);
-    lock.unlock();
 
-    LOG_INFO(cnxString_ << "Connection closed");
+    if (tlsSocket_) {
+        tlsSocket_->lowest_layer().close();
+    }
+
+    if (executor_) {
+        executor_.reset();
+    }
+
+    // Move the internal fields to process them after `mutex_` was unlocked
+    auto consumers = std::move(consumers_);
+    auto producers = std::move(producers_);
+    auto pendingRequests = std::move(pendingRequests_);
+    auto pendingLookupRequests = std::move(pendingLookupRequests_);
+    auto pendingConsumerStatsMap = std::move(pendingConsumerStatsMap_);
+    auto pendingGetLastMessageIdRequests = std::move(pendingGetLastMessageIdRequests_);
+    auto pendingGetNamespaceTopicsRequests = std::move(pendingGetNamespaceTopicsRequests_);
+
+    numOfPendingLookupRequest_ = 0;
 
     if (keepAliveTimer_) {
-        lock.lock();
         keepAliveTimer_->cancel();
         keepAliveTimer_.reset();
-        lock.unlock();
     }
 
     if (consumerStatsRequestTimer_) {
-        lock.lock();
         consumerStatsRequestTimer_->cancel();
         consumerStatsRequestTimer_.reset();
-        lock.unlock();
     }
 
+    lock.unlock();
+    LOG_INFO(cnxString_ << "Connection closed");
+
     for (ProducersMap::iterator it = producers.begin(); it != producers.end(); ++it) {
         HandlerBase::handleDisconnection(ResultConnectError, shared_from_this(), it->second);
     }
@@ -1410,38 +1424,22 @@ void ClientConnection::close() {
 
     connectPromise_.setFailed(ResultConnectError);
 
-    // Fail all pending operations on the connection
-    for (PendingRequestsMap::iterator it = pendingRequests_.begin(); it != pendingRequests_.end(); ++it) {
-        it->second.promise.setFailed(ResultConnectError);
+    // Fail all pending requests, all these type are map whose value type contains the Promise object
+    for (auto& kv : pendingRequests) {
+        kv.second.promise.setFailed(ResultConnectError);
     }
-
-    // Fail all pending lookup-requests on the connection
-    lock.lock();
-    PendingLookupRequestsMap pendingLookupRequests;
-    pendingLookupRequests_.swap(pendingLookupRequests);
-    numOfPendingLookupRequest_ -= pendingLookupRequests.size();
-
-    PendingConsumerStatsMap pendingConsumerStatsMap;
-    pendingConsumerStatsMap_.swap(pendingConsumerStatsMap);
-    lock.unlock();
-
-    for (PendingLookupRequestsMap::iterator it = pendingLookupRequests.begin();
-         it != pendingLookupRequests.end(); ++it) {
-        it->second.promise->setFailed(ResultConnectError);
+    for (auto& kv : pendingLookupRequests) {
+        kv.second.promise->setFailed(ResultConnectError);
     }
-
-    for (PendingConsumerStatsMap::iterator it = pendingConsumerStatsMap.begin();
-         it != pendingConsumerStatsMap.end(); ++it) {
+    for (auto& kv : pendingConsumerStatsMap) {
         LOG_ERROR(cnxString_ << " Closing Client Connection, please try again later");
-        it->second.setFailed(ResultConnectError);
+        kv.second.setFailed(ResultConnectError);
     }
-
-    if (tlsSocket_) {
-        tlsSocket_->lowest_layer().close();
+    for (auto& kv : pendingGetLastMessageIdRequests) {
+        kv.second.setFailed(ResultConnectError);
     }
-
-    if (executor_) {
-        executor_.reset();
+    for (auto& kv : pendingGetNamespaceTopicsRequests) {
+        kv.second.setFailed(ResultConnectError);
     }
 }