You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/12/20 11:57:10 UTC

[pulsar] 20/22: Fix frequent segmentation fault of Python tests by refactoring ExecutorService (#12427)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b0f4d9f078cd0ce36fbb072f4a425d0cfa709c6a
Author: Yunze Xu <xy...@163.com>
AuthorDate: Thu Oct 21 14:29:43 2021 +0800

    Fix frequent segmentation fault of Python tests by refactoring ExecutorService (#12427)
    
    * Refactor ExecutorService to avoid usage of pointers
    
    * Remove unnecessary friend class
    
    * Fix CentOS 7 build
    
    * Fix PeriodicalTest
    
    (cherry picked from commit af0ea69a0368a2804cedbc93a8911b0aebf26dbc)
---
 pulsar-client-cpp/lib/ClientConnection.cc   | 10 +++---
 pulsar-client-cpp/lib/ExecutorService.cc    | 55 ++++++++++++++++-------------
 pulsar-client-cpp/lib/ExecutorService.h     | 42 ++++++++++------------
 pulsar-client-cpp/tests/PeriodicTaskTest.cc | 12 +++----
 4 files changed, 61 insertions(+), 58 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientConnection.cc b/pulsar-client-cpp/lib/ClientConnection.cc
index 3f82e35..b9f2209 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -162,11 +162,11 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
       resolver_(executor_->createTcpResolver()),
       socket_(executor_->createSocket()),
 #if BOOST_VERSION >= 107000
-      strand_(boost::asio::make_strand(executor_->io_service_->get_executor())),
+      strand_(boost::asio::make_strand(executor_->getIOService().get_executor())),
 #elif BOOST_VERSION >= 106600
-      strand_(executor_->io_service_->get_executor()),
+      strand_(executor_->getIOService().get_executor()),
 #else
-      strand_(*(executor_->io_service_)),
+      strand_(executor_->getIOService()),
 #endif
       logicalAddress_(logicalAddress),
       physicalAddress_(physicalAddress),
@@ -183,7 +183,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
 #if BOOST_VERSION >= 105400
         boost::asio::ssl::context ctx(boost::asio::ssl::context::tlsv12_client);
 #else
-        boost::asio::ssl::context ctx(*executor_->io_service_, boost::asio::ssl::context::tlsv1_client);
+        boost::asio::ssl::context ctx(executor_->getIOService(), boost::asio::ssl::context::tlsv1_client);
 #endif
         Url serviceUrl;
         Url::parse(physicalAddress, serviceUrl);
@@ -240,7 +240,7 @@ ClientConnection::ClientConnection(const std::string& logicalAddress, const std:
             }
         }
 
-        tlsSocket_ = executor_->createTlsSocket(socket_, ctx);
+        tlsSocket_ = ExecutorService::createTlsSocket(socket_, ctx);
 
         LOG_DEBUG("TLS SNI Host: " << serviceUrl.host());
         if (!SSL_set_tlsext_host_name(tlsSocket_->native_handle(), serviceUrl.host().c_str())) {
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 4db3112..9cfbd82 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -27,22 +27,40 @@ DECLARE_LOG_OBJECT()
 
 namespace pulsar {
 
-ExecutorService::ExecutorService()
-    : io_service_(new boost::asio::io_service()),
-      work_(new BackgroundWork(*io_service_)),
-      worker_(std::bind(&ExecutorService::startWorker, this, io_service_)) {}
+ExecutorService::ExecutorService() {}
 
 ExecutorService::~ExecutorService() { close(); }
 
-void ExecutorService::startWorker(std::shared_ptr<boost::asio::io_service> io_service) { io_service_->run(); }
+void ExecutorService::start() {
+    auto self = shared_from_this();
+    std::thread t{[self] {
+        if (self->isClosed()) {
+            return;
+        }
+        boost::system::error_code ec;
+        self->getIOService().run(ec);
+        if (ec) {
+            LOG_ERROR("Failed to run io_service: " << ec.message());
+        }
+    }};
+    t.detach();
+}
+
+ExecutorServicePtr ExecutorService::create() {
+    // make_shared cannot access the private constructor, so we need to expose the private constructor via a
+    // derived class.
+    struct ExecutorServiceImpl : public ExecutorService {};
+
+    auto executor = std::make_shared<ExecutorServiceImpl>();
+    executor->start();
+    return std::static_pointer_cast<ExecutorService>(executor);
+}
 
 /*
  *  factory method of boost::asio::ip::tcp::socket associated with io_service_ instance
  *  @ returns shared_ptr to this socket
  */
-SocketPtr ExecutorService::createSocket() {
-    return SocketPtr(new boost::asio::ip::tcp::socket(*io_service_));
-}
+SocketPtr ExecutorService::createSocket() { return SocketPtr(new boost::asio::ip::tcp::socket(io_service_)); }
 
 TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx) {
     return std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> >(
@@ -54,11 +72,11 @@ TlsSocketPtr ExecutorService::createTlsSocket(SocketPtr &socket, boost::asio::ss
  *  @returns shraed_ptr to resolver object
  */
 TcpResolverPtr ExecutorService::createTcpResolver() {
-    return TcpResolverPtr(new boost::asio::ip::tcp::resolver(*io_service_));
+    return TcpResolverPtr(new boost::asio::ip::tcp::resolver(io_service_));
 }
 
 DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
-    return DeadlineTimerPtr(new boost::asio::deadline_timer(*io_service_));
+    return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
 }
 
 void ExecutorService::close() {
@@ -67,21 +85,10 @@ void ExecutorService::close() {
         return;
     }
 
-    io_service_->stop();
-    work_.reset();
-    // Detach the worker thread instead of join to avoid potential deadlock
-    if (worker_.joinable()) {
-        try {
-            worker_.detach();
-        } catch (const std::system_error &e) {
-            // This condition will happen if we're forking the process, therefore the thread was not ported to
-            // the child side of the fork and the detach would be failing.
-            LOG_DEBUG("Failed to detach thread: " << e.what());
-        }
-    }
+    io_service_.stop();
 }
 
-void ExecutorService::postWork(std::function<void(void)> task) { io_service_->post(task); }
+void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
 
 /////////////////////
 
@@ -93,7 +100,7 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
 
     int idx = executorIdx_++ % executors_.size();
     if (!executors_[idx]) {
-        executors_[idx] = std::make_shared<ExecutorService>();
+        executors_[idx] = ExecutorService::create();
     }
 
     return executors_[idx];
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 6746936..6b09091 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -25,7 +25,6 @@
 #include <boost/asio/ssl.hpp>
 #include <functional>
 #include <thread>
-#include <boost/noncopyable.hpp>
 #include <mutex>
 #include <pulsar/defines.h>
 
@@ -34,51 +33,48 @@ typedef std::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
 typedef std::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket &> > TlsSocketPtr;
 typedef std::shared_ptr<boost::asio::ip::tcp::resolver> TcpResolverPtr;
 typedef std::shared_ptr<boost::asio::deadline_timer> DeadlineTimerPtr;
-class PULSAR_PUBLIC ExecutorService : private boost::noncopyable {
-    friend class ClientConnection;
-
+class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<ExecutorService> {
    public:
-    ExecutorService();
+    using IOService = boost::asio::io_service;
+    using SharedPtr = std::shared_ptr<ExecutorService>;
+
+    static SharedPtr create();
     ~ExecutorService();
 
+    ExecutorService(const ExecutorService &) = delete;
+    ExecutorService &operator=(const ExecutorService &) = delete;
+
     SocketPtr createSocket();
-    TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx);
+    static TlsSocketPtr createTlsSocket(SocketPtr &socket, boost::asio::ssl::context &ctx);
     TcpResolverPtr createTcpResolver();
     DeadlineTimerPtr createDeadlineTimer();
     void postWork(std::function<void(void)> task);
+
     void close();
 
-    boost::asio::io_service &getIOService() { return *io_service_; }
+    IOService &getIOService() { return io_service_; }
+    bool isClosed() const noexcept { return closed_; }
 
    private:
     /*
-     *  only called once and within lock so no need to worry about thread-safety
-     */
-    void startWorker(std::shared_ptr<boost::asio::io_service> io_service);
-
-    /*
      * io_service is our interface to os, io object schedule async ops on this object
      */
-    std::shared_ptr<boost::asio::io_service> io_service_;
+    IOService io_service_;
 
     /*
      * work will not let io_service.run() return even after it has finished work
      * it will keep it running in the background so we don't have to take care of it
      */
-    typedef boost::asio::io_service::work BackgroundWork;
-    std::unique_ptr<BackgroundWork> work_;
-
-    /*
-     * worker thread which runs until work object is destroyed, it's running io_service::run in
-     * background invoking async handlers as they are finished and result is available from
-     * io_service
-     */
-    std::thread worker_;
+    IOService::work work_{io_service_};
 
     std::atomic_bool closed_{false};
+
+    ExecutorService();
+
+    void start();
 };
 
-typedef std::shared_ptr<ExecutorService> ExecutorServicePtr;
+using ExecutorServicePtr = ExecutorService::SharedPtr;
 
 class PULSAR_PUBLIC ExecutorServiceProvider {
    public:
diff --git a/pulsar-client-cpp/tests/PeriodicTaskTest.cc b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
index 11c1c62..2c1da70 100644
--- a/pulsar-client-cpp/tests/PeriodicTaskTest.cc
+++ b/pulsar-client-cpp/tests/PeriodicTaskTest.cc
@@ -29,11 +29,11 @@ DECLARE_LOG_OBJECT()
 using namespace pulsar;
 
 TEST(PeriodicTaskTest, testCountdownTask) {
-    ExecutorService executor;
+    auto executor = ExecutorService::create();
 
     std::atomic_int count{5};
 
-    auto task = std::make_shared<PeriodicTask>(executor.getIOService(), 200);
+    auto task = std::make_shared<PeriodicTask>(executor->getIOService(), 200);
     task->setCallback([task, &count](const PeriodicTask::ErrorCode& ec) {
         if (--count <= 0) {
             task->stop();
@@ -56,13 +56,13 @@ TEST(PeriodicTaskTest, testCountdownTask) {
     ASSERT_EQ(count.load(), 0);
     task->stop();
 
-    executor.close();
+    executor->close();
 }
 
 TEST(PeriodicTaskTest, testNegativePeriod) {
-    ExecutorService executor;
+    auto executor = ExecutorService::create();
 
-    auto task = std::make_shared<PeriodicTask>(executor.getIOService(), -1);
+    auto task = std::make_shared<PeriodicTask>(executor->getIOService(), -1);
     std::atomic_bool callbackTriggered{false};
     task->setCallback([&callbackTriggered](const PeriodicTask::ErrorCode& ec) { callbackTriggered = true; });
 
@@ -71,5 +71,5 @@ TEST(PeriodicTaskTest, testNegativePeriod) {
     ASSERT_EQ(callbackTriggered.load(), false);
     task->stop();
 
-    executor.close();
+    executor->close();
 }