You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2021/09/04 00:19:14 UTC

[pulsar] 02/05: Fix deadlock caused by ExecutorService::close (#11882)

This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c8eddf30af4d1f531db6e563f611f22c730397a3
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Sep 3 00:53:12 2021 +0800

    Fix deadlock caused by ExecutorService::close (#11882)
    
    (cherry picked from commit 43b4ff6aaeef6969c73af6e3742f57554f76e1ca)
---
 pulsar-client-cpp/lib/ClientConnection.cc | 6 +++++-
 pulsar-client-cpp/lib/ExecutorService.cc  | 8 +++-----
 2 files changed, 8 insertions(+), 6 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index ac228b1..0411b2f 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -561,7 +561,11 @@ void ClientConnection::handleRead(const boost::system::error_code& err, size_t b
 
     if (err || bytesTransferred == 0) {
         if (err) {
-            LOG_ERROR(cnxString_ << "Read failed: " << err.message());
+            if (err == boost::asio::error::operation_aborted) {
+                LOG_DEBUG(cnxString_ << "Read failed: " << err.message());
+            } else {
+                LOG_ERROR(cnxString_ << "Read operation was cancelled");
+            }
         }  // else: bytesTransferred == 0, which means server has closed the connection
         close();
     } else if (bytesTransferred < minReadSize) {
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 8de0459..7cd4a49 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -68,11 +68,9 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
 void ExecutorService::close() {
     io_service_->stop();
     work_.reset();
-    // If this thread is attempting to join itself, do not. The destructor's
-    // call to close will handle joining if it does not occur here. This also ensures
-    // join is not called twice since it is not re-entrant on windows
-    if (std::this_thread::get_id() != worker_.get_id() && worker_.joinable()) {
-        worker_.join();
+    // Detach the worker thread instead of join to avoid potential deadlock
+    if (worker_.joinable()) {
+        worker_.detach();
     }
 }