You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:10 UTC
[pulsar] 20/22: Fix frequent segmentation fault of Python tests by refactoring ExecutorService (#12427)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b0f4d9f078cd0ce36fbb072f4a425d0cfa709c6a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 21 14:29:43 2021 +0800
Fix frequent segmentation fault of Python tests by refactoring ExecutorService (#12427)
* Refactor ExecutorService to avoid usage of pointers
* Remove unnecessary friend class
* Fix CentOS 7 build
* Fix PeriodicalTest
(cherry picked from commit af0ea69a0368a2804cedbc93a8911b0aebf26dbc)
---
pulsar-client-cpp/lib/ClientConnection.cc | 10 +++---
pulsar-client-cpp/lib/ExecutorService.cc | 55 ++++++++++++++++-------------
pulsar-client-cpp/lib/ExecutorService.h | 42 ++++++++++------------
pulsar-client-cpp/tests/PeriodicTaskTest.cc | 12 +++----
4 files changed, 61 insertions(+), 58 deletions(-)
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3f82e35..b9f2209 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -162,11 +162,11 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
resolver_(executor_->createTcpResolver()),
socket_(executor_->createSocket()),
#if BOOST_VERSION >= 107000
- strand_(boost::asio::make_strand(executor_->io_service_->get_executor())),
+ strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
#elif BOOST_VERSION >= 106600
- strand_(executor_->io_service_->get_executor()),
+ strand_(executor_->getIOService().get_executor()),
#else
- strand_(*(executor_->io_service_)),
+ strand_(executor_->getIOService()),
#endif
logicalAddress_(logicalAddress),
physicalAddress_(physicalAddress),
@@ -183,7 +183,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
#if BOOST_VERSION >= 105400
boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
#else
- boost::asio::ssl::context ctx(*executor_->io_service_, boost::asio::ssl::context::tlsv1_client);
+ boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client);
#endif
Url serviceUrl;
Url::parse(physicalAddress, serviceUrl);
@@ -240,7 +240,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
}
}
- tlsSocket_ = executor_->createTlsSocket(socket_, ctx);
+ tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx);
LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) {
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 4db3112..9cfbd82 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -27,22 +27,40 @@ DECLARE_LOG_OBJECT()
namespace pulsar {
-ExecutorService::ExecutorService()
- : io_service_(new boost::asio::io_service()),
- work_(new BackgroundWork(*io_service_)),
- worker_(std::bind(&ExecutorService::startWorker, this, io_service_)) {}
+ExecutorService::ExecutorService() {}
ExecutorService::~ExecutorService() { close(); }
-void ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service> io_service) { io_service_->run(); }
+void ExecutorService::start() {
+ auto self = shared_from_this();
+ std::thread t{[self] {
+ if (self->isClosed()) {
+ return;
+ }
+ boost::system::error_code ec;
+ self->getIOService().run(ec);
+ if (ec) {
+ LOG_ERROR("Failed to run io_service: " << ec.message());
+ }
+ }};
+ t.detach();
+}
+
+ExecutorServicePtr ExecutorService::create() {
+ // make_shared cannot access the private constructor, so we need to expose the private constructor via a
+ // derived class.
+ struct ExecutorServiceImpl : public ExecutorService {};
+
+ auto executor = std::make_shared<ExecutorServiceImpl>();
+ executor->start();
+ return std::static_pointer_cast<ExecutorService>(executor);
+}
/*
* factory method of boost::asio::ip::tcp::socket associated with io_service_ instance
* @ returns shared_ptr to this socket
*/
-SocketPtr ExecutorService::createSocket() {
- return SocketPtr(new boost::asio::ip::tcp::socket(*io_service_));
-}
+SocketPtr ExecutorService::createSocket() { return SocketPtr(new boost::asio::ip::tcp::socket(io_service_)); }
TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) {
return std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> >(
@@ -54,11 +72,11 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss
* @returns shraed_ptr to resolver object
*/
TcpResolverPtr ExecutorService::createTcpResolver() {
- return TcpResolverPtr(new boost::asio::ip::tcp::resolver(*io_service_));
+ return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_));
}
DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
- return DeadlineTimerPtr(new boost::asio::deadline_timer(*io_service_));
+ return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
}
void ExecutorService::close() {
@@ -67,21 +85,10 @@ void ExecutorService::close() {
return;
}
- io_service_->stop();
- work_.reset();
- // Detach the worker thread instead of join to avoid potential deadlock
- if (worker_.joinable()) {
- try {
- worker_.detach();
- } catch (const std::system_error &e) {
- // This condition will happen if we're forking the process, therefore the thread was not ported to
- // the child side of the fork and the detach would be failing.
- LOG_DEBUG("Failed to detach thread: " << e.what());
- }
- }
+ io_service_.stop();
}
-void ExecutorService::postWork(std::function<void(void)> task) { io_service_->post(task); }
+void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
/////////////////////
@@ -93,7 +100,7 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
int idx = executorIdx_++ % executors_.size();
if (!executors_[idx]) {
- executors_[idx] = std::make_shared<ExecutorService>();
+ executors_[idx] = ExecutorService::create();
}
return executors_[idx];
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 6746936..6b09091 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -25,7 +25,6 @@
#include <boost/asio/ssl.hpp>
#include <functional>
#include <thread>
-#include <boost/noncopyable.hpp>
#include <mutex>
#include <pulsar/defines.h>
@@ -34,51 +33,48 @@ typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
typedef std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> > TlsSocketPtr;
typedef std::shared_ptr<boost::asio::ip::tcp::resolver> TcpResolverPtr;
typedef std::shared_ptr<boost::asio::deadline_timer> DeadlineTimerPtr;
-class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
- friend class ClientConnection;
-
+class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
public:
- ExecutorService();
+ using IOService = boost::asio::io_service;
+ using SharedPtr = std::shared_ptr<ExecutorService>;
+
+ static SharedPtr create();
~ExecutorService();
+ ExecutorService(const ExecutorService &) = delete;
+ ExecutorService &operator=(const ExecutorService &) = delete;
+
SocketPtr createSocket();
- TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx);
+ static TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx);
TcpResolverPtr createTcpResolver();
DeadlineTimerPtr createDeadlineTimer();
void postWork(std::function<void(void)> task);
+
void close();
- boost::asio::io_service &getIOService() { return *io_service_; }
+ IOService &getIOService() { return io_service_; }
+ bool isClosed() const noexcept { return closed_; }
private:
/*
- * only called once and within lock so no need to worry about thread-safety
- */
- void startWorker(std::shared_ptr<boost::asio::io_service> io_service);
-
- /*
* io_service is our interface to os, io object schedule async ops on this object
*/
- std::shared_ptr<boost::asio::io_service> io_service_;
+ IOService io_service_;
/*
* work will not let io_service.run() return even after it has finished work
* it will keep it running in the background so we don't have to take care of it
*/
- typedef boost::asio::io_service::work BackgroundWork;
- std::unique_ptr<BackgroundWork> work_;
-
- /*
- * worker thread which runs until work object is destroyed, it's running io_service::run in
- * background invoking async handlers as they are finished and result is available from
- * io_service
- */
- std::thread worker_;
+ IOService::work work_{io_service_};
std::atomic_bool closed_{false};
+
+ ExecutorService();
+
+ void start();
};
-typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
+using ExecutorServicePtr = ExecutorService::SharedPtr;
class PULSAR_PUBLIC ExecutorServiceProvider {
public:
diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
index 11c1c62..2c1da70 100644
--- a/pulsar-client-cpp/tests/PeriodicTaskTest.cc
+++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
@@ -29,11 +29,11 @@ DECLARE_LOG_OBJECT()
using namespace pulsar;
TEST(PeriodicTaskTest, testCountdownTask) {
- ExecutorService executor;
+ auto executor = ExecutorService::create();
std::atomic_int count{5};
- auto task = std::make_shared<PeriodicTask>(executor.getIOService(), 200);
+ auto task = std::make_shared<PeriodicTask>(executor->getIOService(), 200);
task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) {
if (--count <= 0) {
task->stop();
@@ -56,13 +56,13 @@ TEST(PeriodicTaskTest, testCountdownTask) {
ASSERT_EQ(count.load(), 0);
task->stop();
- executor.close();
+ executor->close();
}
TEST(PeriodicTaskTest, testNegativePeriod) {
- ExecutorService executor;
+ auto executor = ExecutorService::create();
- auto task = std::make_shared<PeriodicTask>(executor.getIOService(), -1);
+ auto task = std::make_shared<PeriodicTask>(executor->getIOService(), -1);
std::atomic_bool callbackTriggered{false};
task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) { callbackTriggered = true; });
@@ -71,5 +71,5 @@ TEST(PeriodicTaskTest, testNegativePeriod) {
ASSERT_EQ(callbackTriggered.load(), false);
task->stop();
- executor.close();
+ executor->close();
}