You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/18 07:19:04 UTC

[pulsar] 01/04: [C++] Make some clean up methods thread safe (#11762)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 0c29a884812247663168511d5c71b15257734f06
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Aug 25 14:19:49 2021 +0800

    [C++] Make some clean up methods thread safe (#11762)
    
    * Make some close methods thread safe
    
    * Restore shutdown() in ClientImpl's destructor and check whether connection pool is closed
    
    (cherry picked from commit 098ba16c15e81dad84e5b03a6565f8bb9941ad7a)
---
 pulsar-client-cpp/lib/ClientImpl.cc                | 13 ++++++++++++-
 pulsar-client-cpp/lib/ConnectionPool.cc            | 14 +++++++++++++-
 pulsar-client-cpp/lib/ConnectionPool.h             |  9 ++++++++-
 pulsar-client-cpp/lib/ExecutorService.cc           |  7 +++++++
 pulsar-client-cpp/lib/ExecutorService.h            |  3 +++
 pulsar-client-cpp/lib/Producer.cc                  |  1 -
 pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc     | 10 ++++++++++
 pulsar-client-cpp/tests/BinaryLookupServiceTest.cc |  2 --
 pulsar-client-cpp/tests/CustomLoggerTest.cc        |  4 ++--
 pulsar-client-cpp/tests/MessageTest.cc             |  2 --
 pulsar-client-cpp/tests/ReaderConfigurationTest.cc |  3 ---
 11 files changed, 55 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 5abe6ec..613a979 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -560,10 +560,21 @@ void ClientImpl::shutdown() {
         }
     }
 
-    pool_.close();
+    if (producers.size() + consumers.size() > 0) {
+        LOG_DEBUG(producers.size() << " producers and " << consumers.size()
+                                   << " consumers have been shutdown.");
+    }
+    if (!pool_.close()) {
+        // pool_ has already been closed. It means shutdown() has been called before.
+        return;
+    }
+    LOG_DEBUG("ConnectionPool is closed");
     ioExecutorProvider_->close();
+    LOG_DEBUG("ioExecutorProvider_ is closed");
     listenerExecutorProvider_->close();
+    LOG_DEBUG("listenerExecutorProvider_ is closed");
     partitionListenerExecutorProvider_->close();
+    LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
 
 uint64_t ClientImpl::newProducerId() {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index bb4f5c2..e03697f 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
       poolConnections_(poolConnections),
       mutex_() {}
 
-void ConnectionPool::close() {
+bool ConnectionPool::close() {
+    bool expectedState = false;
+    if (!closed_.compare_exchange_strong(expectedState, true)) {
+        return false;
+    }
+
     std::unique_lock<std::mutex> lock(mutex_);
     if (poolConnections_) {
         for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
@@ -52,10 +57,17 @@ void ConnectionPool::close() {
         }
         pool_.clear();
     }
+    return true;
 }
 
 Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
     const std::string& logicalAddress, const std::string& physicalAddress) {
+    if (closed_) {
+        Promise<Result, ClientConnectionWeakPtr> promise;
+        promise.setFailed(ResultAlreadyClosed);
+        return promise.getFuture();
+    }
+
     std::unique_lock<std::mutex> lock(mutex_);
 
     if (poolConnections_) {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 8b35044..5032e80 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -24,6 +24,7 @@
 
 #include "ClientConnection.h"
 
+#include <atomic>
 #include <string>
 #include <map>
 #include <mutex>
@@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool {
     ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
                    const AuthenticationPtr& authentication, bool poolConnections = true);
 
-    void close();
+    /**
+     * Close the connection pool.
+     *
+     * @return false if it has already been closed.
+     */
+    bool close();
 
     /**
      * Get a connection from the pool.
@@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool {
     PoolMap pool_;
     bool poolConnections_;
     std::mutex mutex_;
+    std::atomic_bool closed_{false};
 
     friend class ConnectionPoolTest;
 };
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index f7cb010..4db3112 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -62,6 +62,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
 }
 
 void ExecutorService::close() {
+    bool expectedState = false;
+    if (!closed_.compare_exchange_strong(expectedState, true)) {
+        return;
+    }
+
     io_service_->stop();
     work_.reset();
     // Detach the worker thread instead of join to avoid potential deadlock
@@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
 }
 
 void ExecutorServiceProvider::close() {
+    Lock lock(mutex_);
+
     for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
         if (*it != NULL) {
             (*it)->close();
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index d0ffc23..6746936 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -19,6 +19,7 @@
 #ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
 #define _PULSAR_EXECUTOR_SERVICE_HEADER_
 
+#include <atomic>
 #include <memory>
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
@@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
      * io_service
      */
     std::thread worker_;
+
+    std::atomic_bool closed_{false};
 };
 
 typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index acd021b..ad60828 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,7 +24,6 @@
 #include "ProducerImpl.h"
 
 namespace pulsar {
-DECLARE_LOG_OBJECT()
 
 static const std::string EMPTY_STRING;
 
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 5c37f48..919536f 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -38,7 +38,17 @@
 #include <boost/property_tree/ptree.hpp>
 namespace ptree = boost::property_tree;
 
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunknown-warning-option"
+#endif
+
 #include <boost/xpressive/xpressive.hpp>
+
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
 #include <boost/archive/iterators/base64_from_binary.hpp>
 #include <boost/archive/iterators/transform_width.hpp>
 
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 11cc053..b880df3 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -27,8 +27,6 @@
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 
 TEST(BinaryLookupServiceTest, basicLookup) {
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index ec83e42..0b4e76a 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
         // reset to previous log factory
         Client client("pulsar://localhost:6650", clientConfig);
         client.close();
-        ASSERT_EQ(logLines.size(), 3);
+        ASSERT_EQ(logLines.size(), 7);
         LogUtils::resetLoggerFactory();
     });
     testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
     Client client("pulsar://localhost:6650", clientConfig);
     client.close();
     // custom logger didn't get any new lines
-    ASSERT_EQ(logLines.size(), 3);
+    ASSERT_EQ(logLines.size(), 7);
 }
 
 TEST(CustomLoggerTest, testConsoleLoggerFactory) {
diff --git a/pulsar-client-cpp/tests/MessageTest.cc b/pulsar-client-cpp/tests/MessageTest.cc
index 246203c..3c728c9 100644
--- a/pulsar-client-cpp/tests/MessageTest.cc
+++ b/pulsar-client-cpp/tests/MessageTest.cc
@@ -22,8 +22,6 @@
 #include <string>
 #include <lib/LogUtils.h>
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 TEST(MessageTest, testMessageContents) {
     MessageBuilder msgBuilder1;
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
index ccbfa2d..8dc60f4 100644
--- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -22,12 +22,9 @@
  */
 #include <gtest/gtest.h>
 #include <pulsar/Client.h>
-#include <lib/LogUtils.h>
 #include <lib/ReaderImpl.h>
 #include "NoOpsCryptoKeyReader.h"
 
-DECLARE_LOG_OBJECT()
-
 using namespace pulsar;
 
 static const std::string lookupUrl = "pulsar://localhost:6650";