You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/18 07:19:04 UTC
[pulsar] 01/04: [C++] Make some clean up methods thread safe
(#11762)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 0c29a884812247663168511d5c71b15257734f06
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Aug 25 14:19:49 2021 +0800
[C++] Make some clean up methods thread safe (#11762)
* Make some close methods thread safe
* Restore shutdown() in ClientImpl's destructor and check whether connection pool is closed
(cherry picked from commit 098ba16c15e81dad84e5b03a6565f8bb9941ad7a)
---
pulsar-client-cpp/lib/ClientImpl.cc | 13 ++++++++++++-
pulsar-client-cpp/lib/ConnectionPool.cc | 14 +++++++++++++-
pulsar-client-cpp/lib/ConnectionPool.h | 9 ++++++++-
pulsar-client-cpp/lib/ExecutorService.cc | 7 +++++++
pulsar-client-cpp/lib/ExecutorService.h | 3 +++
pulsar-client-cpp/lib/Producer.cc | 1 -
pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc | 10 ++++++++++
pulsar-client-cpp/tests/BinaryLookupServiceTest.cc | 2 --
pulsar-client-cpp/tests/CustomLoggerTest.cc | 4 ++--
pulsar-client-cpp/tests/MessageTest.cc | 2 --
pulsar-client-cpp/tests/ReaderConfigurationTest.cc | 3 ---
11 files changed, 55 insertions(+), 13 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 5abe6ec..613a979 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -560,10 +560,21 @@ void ClientImpl::shutdown() {
}
}
- pool_.close();
+ if (producers.size() + consumers.size() > 0) {
+ LOG_DEBUG(producers.size() << " producers and " << consumers.size()
+ << " consumers have been shutdown.");
+ }
+ if (!pool_.close()) {
+ // pool_ has already been closed. It means shutdown() has been called before.
+ return;
+ }
+ LOG_DEBUG("ConnectionPool is closed");
ioExecutorProvider_->close();
+ LOG_DEBUG("ioExecutorProvider_ is closed");
listenerExecutorProvider_->close();
+ LOG_DEBUG("listenerExecutorProvider_ is closed");
partitionListenerExecutorProvider_->close();
+ LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
}
uint64_t ClientImpl::newProducerId() {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.cc b/pulsar-client-cpp/lib/ConnectionPool.cc
index bb4f5c2..e03697f 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.cc
+++ b/pulsar-client-cpp/lib/ConnectionPool.cc
@@ -41,7 +41,12 @@ ConnectionPool::ConnectionPool(const ClientConfiguration& conf, ExecutorServiceP
poolConnections_(poolConnections),
mutex_() {}
-void ConnectionPool::close() {
+bool ConnectionPool::close() {
+ bool expectedState = false;
+ if (!closed_.compare_exchange_strong(expectedState, true)) {
+ return false;
+ }
+
std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
for (auto cnxIt = pool_.begin(); cnxIt != pool_.end(); cnxIt++) {
@@ -52,10 +57,17 @@ void ConnectionPool::close() {
}
pool_.clear();
}
+ return true;
}
Future<Result, ClientConnectionWeakPtr> ConnectionPool::getConnectionAsync(
const std::string& logicalAddress, const std::string& physicalAddress) {
+ if (closed_) {
+ Promise<Result, ClientConnectionWeakPtr> promise;
+ promise.setFailed(ResultAlreadyClosed);
+ return promise.getFuture();
+ }
+
std::unique_lock<std::mutex> lock(mutex_);
if (poolConnections_) {
diff --git a/pulsar-client-cpp/lib/ConnectionPool.h b/pulsar-client-cpp/lib/ConnectionPool.h
index 8b35044..5032e80 100644
--- a/pulsar-client-cpp/lib/ConnectionPool.h
+++ b/pulsar-client-cpp/lib/ConnectionPool.h
@@ -24,6 +24,7 @@
#include "ClientConnection.h"
+#include <atomic>
#include <string>
#include <map>
#include <mutex>
@@ -36,7 +37,12 @@ class PULSAR_PUBLIC ConnectionPool {
ConnectionPool(const ClientConfiguration& conf, ExecutorServiceProviderPtr executorProvider,
const AuthenticationPtr& authentication, bool poolConnections = true);
- void close();
+ /**
+ * Close the connection pool.
+ *
+ * @return false if it has already been closed.
+ */
+ bool close();
/**
* Get a connection from the pool.
@@ -65,6 +71,7 @@ class PULSAR_PUBLIC ConnectionPool {
PoolMap pool_;
bool poolConnections_;
std::mutex mutex_;
+ std::atomic_bool closed_{false};
friend class ConnectionPoolTest;
};
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index f7cb010..4db3112 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -62,6 +62,11 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
}
void ExecutorService::close() {
+ bool expectedState = false;
+ if (!closed_.compare_exchange_strong(expectedState, true)) {
+ return;
+ }
+
io_service_->stop();
work_.reset();
// Detach the worker thread instead of join to avoid potential deadlock
@@ -95,6 +100,8 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
}
void ExecutorServiceProvider::close() {
+ Lock lock(mutex_);
+
for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
if (*it != NULL) {
(*it)->close();
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index d0ffc23..6746936 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -19,6 +19,7 @@
#ifndef _PULSAR_EXECUTOR_SERVICE_HEADER_
#define _PULSAR_EXECUTOR_SERVICE_HEADER_
+#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
@@ -73,6 +74,8 @@ class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
* io_service
*/
std::thread worker_;
+
+ std::atomic_bool closed_{false};
};
typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index acd021b..ad60828 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -24,7 +24,6 @@
#include "ProducerImpl.h"
namespace pulsar {
-DECLARE_LOG_OBJECT()
static const std::string EMPTY_STRING;
diff --git a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
index 5c37f48..919536f 100644
--- a/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
+++ b/pulsar-client-cpp/lib/auth/athenz/ZTSClient.cc
@@ -38,7 +38,17 @@
#include <boost/property_tree/ptree.hpp>
namespace ptree = boost::property_tree;
+#if defined(__clang__)
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wunknown-warning-option"
+#endif
+
#include <boost/xpressive/xpressive.hpp>
+
+#if defined(__clang__)
+#pragma clang diagnostic pop
+#endif
+
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 11cc053..b880df3 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -27,8 +27,6 @@
#include <pulsar/Authentication.h>
#include <boost/exception/all.hpp>
-DECLARE_LOG_OBJECT()
-
using namespace pulsar;
TEST(BinaryLookupServiceTest, basicLookup) {
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index ec83e42..0b4e76a 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -56,7 +56,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
// reset to previous log factory
Client client("pulsar://localhost:6650", clientConfig);
client.close();
- ASSERT_EQ(logLines.size(), 3);
+ ASSERT_EQ(logLines.size(), 7);
LogUtils::resetLoggerFactory();
});
testThread.join();
@@ -65,7 +65,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
Client client("pulsar://localhost:6650", clientConfig);
client.close();
// custom logger didn't get any new lines
- ASSERT_EQ(logLines.size(), 3);
+ ASSERT_EQ(logLines.size(), 7);
}
TEST(CustomLoggerTest, testConsoleLoggerFactory) {
diff --git a/pulsar-client-cpp/tests/MessageTest.cc b/pulsar-client-cpp/tests/MessageTest.cc
index 246203c..3c728c9 100644
--- a/pulsar-client-cpp/tests/MessageTest.cc
+++ b/pulsar-client-cpp/tests/MessageTest.cc
@@ -22,8 +22,6 @@
#include <string>
#include <lib/LogUtils.h>
-DECLARE_LOG_OBJECT()
-
using namespace pulsar;
TEST(MessageTest, testMessageContents) {
MessageBuilder msgBuilder1;
diff --git a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
index ccbfa2d..8dc60f4 100644
--- a/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
+++ b/pulsar-client-cpp/tests/ReaderConfigurationTest.cc
@@ -22,12 +22,9 @@
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
-#include <lib/LogUtils.h>
#include <lib/ReaderImpl.h>
#include "NoOpsCryptoKeyReader.h"
-DECLARE_LOG_OBJECT()
-
using namespace pulsar;
static const std::string lookupUrl = "pulsar://localhost:6650";