You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/04/27 07:10:11 UTC

[pulsar] branch master updated: [C++] Wait until event loop terminates when closing the Client (#15316)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd78f39a925 [C++] Wait until event loop terminates when closing the Client (#15316)
cd78f39a925 is described below

commit cd78f39a92521f3847b022580a6e66e651b5cb4b
Author: Yunze Xu <xy...@163.com>
AuthorDate: Wed Apr 27 15:10:01 2022 +0800

    [C++] Wait until event loop terminates when closing the Client (#15316)
    
    * [C++] Wait until event loops terminates when closing the Client
    
    Fixes #13267
    
    ### Motivation
    
    Unlike Java client, the `Client` of C++ client has a `shutdown` method
    that is responsible to execute the following steps:
    1. Call `shutdown` on all internal producers and consumers
    2. Close all connections in the pool
    3. Close all executors of the executor providers.
    
    When an executor is closed, it call `io_service::stop()`, which makes
    the event loop (`io_service::run()`) in another thread return as soon as
    possible. However, there is no wait operation. If a client failed to
    create a producer or consumer, the `close` method will call `shutdown`
    and close all executors immediately and exits the application. In this
    case, the detached event loop thread might not exit ASAP, then valgrind
    will detect the memory leak.
    
    This memory leak can be avoided by sleeping for a while after
    `Client::close` returns or there are still other things to do after
    that. However, we should still adopt the semantics that after
    `Client::shutdown` returns, all event loop threads should be terminated.
    
    ### Modifications
    - Add a timeout parameter to the `close` method of `ExecutorService` and
      `ExecutorServiceProvider` as the max blocking timeout if it's
      non-negative.
    - Add a `TimeoutProcessor` helper class to update the left timeout after
      calling all methods that accept the timeout parameter.
    - Call `close` on all `ExecutorServiceProvider`s in
      `ClientImpl::shutdown` with 500ms timeout, which could be long enough.
      In addition, in `handleClose` method, call `shutdown` in another
      thread to avoid the deadlock.
    
    ### Verifying this change
    
    After applying this patch, the reproduce code in #13627 will pass the
    valgrind check.
    
    ```
    ==3013== LEAK SUMMARY:
    ==3013==    definitely lost: 0 bytes in 0 blocks
    ==3013==    indirectly lost: 0 bytes in 0 blocks
    ==3013==      possibly lost: 0 bytes in 0 blocks
    ```
---
 pulsar-client-cpp/lib/ClientImpl.cc         | 37 +++++++++++++++++-----
 pulsar-client-cpp/lib/ExecutorService.cc    | 33 +++++++++++++++-----
 pulsar-client-cpp/lib/ExecutorService.h     | 11 +++++--
 pulsar-client-cpp/lib/TimeUtils.h           | 48 +++++++++++++++++++++++++++++
 pulsar-client-cpp/tests/CustomLoggerTest.cc | 26 ++++++++++------
 5 files changed, 130 insertions(+), 25 deletions(-)

diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index 0b07b6e9f2d..60e3e248c12 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -26,6 +26,7 @@
 #include "PartitionedConsumerImpl.h"
 #include "MultiTopicsConsumerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
+#include "TimeUtils.h"
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <boost/algorithm/string/predicate.hpp>
 #include <sstream>
@@ -35,6 +36,7 @@
 #include <algorithm>
 #include <random>
 #include <mutex>
+#include <thread>
 #ifdef USE_LOG4CXX
 #include "Log4CxxLogger.h"
 #endif
@@ -538,13 +540,20 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
         lock.unlock();
 
         LOG_DEBUG("Shutting down producers and consumers for client");
-        shutdown();
-        if (callback) {
-            if (closingError != ResultOk) {
-                LOG_DEBUG("Problem in closing client, could not close one or more consumers or producers");
+        // handleClose() is called in ExecutorService's event loop, while shutdown() tried to wait the event
+        // loop exits. So here we use another thread to call shutdown().
+        auto self = shared_from_this();
+        std::thread shutdownTask{[this, self, callback] {
+            shutdown();
+            if (callback) {
+                if (closingError != ResultOk) {
+                    LOG_DEBUG(
+                        "Problem in closing client, could not close one or more consumers or producers");
+                }
+                callback(closingError);
             }
-            callback(closingError);
-        }
+        }};
+        shutdownTask.detach();
     }
 }
 
@@ -580,11 +589,25 @@ void ClientImpl::shutdown() {
         return;
     }
     LOG_DEBUG("ConnectionPool is closed");
-    ioExecutorProvider_->close();
+
+    // 500ms as the timeout is long enough because ExecutorService::close calls io_service::stop() internally
+    // and waits until io_service::run() in another thread returns, which should be as soon as possible after
+    // stop() is called.
+    TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{500};
+
+    timeoutProcessor.tik();
+    ioExecutorProvider_->close(timeoutProcessor.getLeftTimeout());
+    timeoutProcessor.tok();
     LOG_DEBUG("ioExecutorProvider_ is closed");
+
+    timeoutProcessor.tik();
     listenerExecutorProvider_->close();
+    timeoutProcessor.tok();
     LOG_DEBUG("listenerExecutorProvider_ is closed");
+
+    timeoutProcessor.tik();
     partitionListenerExecutorProvider_->close();
+    timeoutProcessor.tok();
     LOG_DEBUG("partitionListenerExecutorProvider_ is closed");
 }
 
diff --git a/pulsar-client-cpp/lib/ExecutorService.cc b/pulsar-client-cpp/lib/ExecutorService.cc
index 9cfbd82881d..b9b5ed46478 100644
--- a/pulsar-client-cpp/lib/ExecutorService.cc
+++ b/pulsar-client-cpp/lib/ExecutorService.cc
@@ -21,6 +21,7 @@
 #include <boost/asio.hpp>
 #include <functional>
 #include <memory>
+#include "TimeUtils.h"
 
 #include "LogUtils.h"
 DECLARE_LOG_OBJECT()
@@ -29,7 +30,7 @@ namespace pulsar {
 
 ExecutorService::ExecutorService() {}
 
-ExecutorService::~ExecutorService() { close(); }
+ExecutorService::~ExecutorService() { close(0); }
 
 void ExecutorService::start() {
     auto self = shared_from_this();
@@ -37,11 +38,16 @@ void ExecutorService::start() {
         if (self->isClosed()) {
             return;
         }
+        LOG_INFO("Run io_service in a single thread");
         boost::system::error_code ec;
         self->getIOService().run(ec);
         if (ec) {
             LOG_ERROR("Failed to run io_service: " << ec.message());
+        } else {
+            LOG_INFO("Event loop of ExecutorService exits successfully");
         }
+        self->ioServiceDone_ = true;
+        self->cond_.notify_all();
     }};
     t.detach();
 }
@@ -79,13 +85,23 @@ DeadlineTimerPtr ExecutorService::createDeadlineTimer() {
     return DeadlineTimerPtr(new boost::asio::deadline_timer(io_service_));
 }
 
-void ExecutorService::close() {
+void ExecutorService::close(long timeoutMs) {
     bool expectedState = false;
     if (!closed_.compare_exchange_strong(expectedState, true)) {
         return;
     }
+    if (timeoutMs == 0) {  // non-blocking
+        io_service_.stop();
+        return;
+    }
 
+    std::unique_lock<std::mutex> lock{mutex_};
     io_service_.stop();
+    if (timeoutMs > 0) {
+        cond_.wait_for(lock, std::chrono::milliseconds(timeoutMs), [this] { return ioServiceDone_.load(); });
+    } else {  // < 0
+        cond_.wait(lock, [this] { return ioServiceDone_.load(); });
+    }
 }
 
 void ExecutorService::postWork(std::function<void(void)> task) { io_service_.post(task); }
@@ -106,14 +122,17 @@ ExecutorServicePtr ExecutorServiceProvider::get() {
     return executors_[idx];
 }
 
-void ExecutorServiceProvider::close() {
+void ExecutorServiceProvider::close(long timeoutMs) {
     Lock lock(mutex_);
 
-    for (ExecutorList::iterator it = executors_.begin(); it != executors_.end(); ++it) {
-        if (*it != NULL) {
-            (*it)->close();
+    TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{timeoutMs};
+    for (auto &&executor : executors_) {
+        timeoutProcessor.tik();
+        if (executor) {
+            executor->close(timeoutProcessor.getLeftTimeout());
         }
-        it->reset();
+        timeoutProcessor.tok();
+        executor.reset();
     }
 }
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ExecutorService.h b/pulsar-client-cpp/lib/ExecutorService.h
index 6b0909194b7..e4cbb3ce62e 100644
--- a/pulsar-client-cpp/lib/ExecutorService.h
+++ b/pulsar-client-cpp/lib/ExecutorService.h
@@ -20,6 +20,8 @@
 #define _PULSAR_EXECUTOR_SERVICE_HEADER_
 
 #include <atomic>
+#include <condition_variable>
+#include <chrono>
 #include <memory>
 #include <boost/asio.hpp>
 #include <boost/asio/ssl.hpp>
@@ -50,7 +52,8 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
     DeadlineTimerPtr createDeadlineTimer();
     void postWork(std::function<void(void)> task);
 
-    void close();
+    // See TimeoutProcessor for the semantics of the parameter.
+    void close(long timeoutMs = 3000);
 
     IOService &getIOService() { return io_service_; }
     bool isClosed() const noexcept { return closed_; }
@@ -68,6 +71,9 @@ class PULSAR_PUBLIC ExecutorService : public std::enable_shared_from_this<Execut
     IOService::work work_{io_service_};
 
     std::atomic_bool closed_{false};
+    std::mutex mutex_;
+    std::condition_variable cond_;
+    std::atomic_bool ioServiceDone_{false};
 
     ExecutorService();
 
@@ -82,7 +88,8 @@ class PULSAR_PUBLIC ExecutorServiceProvider {
 
     ExecutorServicePtr get();
 
-    void close();
+    // See TimeoutProcessor for the semantics of the parameter.
+    void close(long timeoutMs = 3000);
 
    private:
     typedef std::vector<ExecutorServicePtr> ExecutorList;
diff --git a/pulsar-client-cpp/lib/TimeUtils.h b/pulsar-client-cpp/lib/TimeUtils.h
index 1da7d65923a..45157ae855b 100644
--- a/pulsar-client-cpp/lib/TimeUtils.h
+++ b/pulsar-client-cpp/lib/TimeUtils.h
@@ -19,6 +19,8 @@
 #pragma once
 
 #include <boost/date_time/local_time/local_time.hpp>
+#include <atomic>
+#include <chrono>
 
 #include <pulsar/defines.h>
 
@@ -33,4 +35,50 @@ class PULSAR_PUBLIC TimeUtils {
     static ptime now();
     static int64_t currentTimeMillis();
 };
+
+// This class processes a timeout with the following semantics:
+//  > 0: wait at most the timeout until a blocking operation completes
+//  == 0: do not wait the blocking operation
+//  < 0: wait infinitely until a blocking operation completes.
+//
+// Here is a simple example usage:
+//
+// ```c++
+// // Wait at most 300 milliseconds
+// TimeoutProcessor<std::chrono::milliseconds> timeoutProcessor{300};
+// while (!allOperationsAreDone()) {
+//     timeoutProcessor.tik();
+//     // This method may block for some time
+//     performBlockingOperation(timeoutProcessor.getLeftTimeout());
+//     timeoutProcessor.tok();
+// }
+// ```
+//
+// The template argument is the same as std::chrono::duration.
+template <typename Duration>
+class TimeoutProcessor {
+   public:
+    using Clock = std::chrono::high_resolution_clock;
+
+    TimeoutProcessor(long timeout) : leftTimeout_(timeout) {}
+
+    long getLeftTimeout() const noexcept { return leftTimeout_; }
+
+    void tik() { before_ = Clock::now(); }
+
+    void tok() {
+        if (leftTimeout_ > 0) {
+            leftTimeout_ -= std::chrono::duration_cast<Duration>(Clock::now() - before_).count();
+            if (leftTimeout_ <= 0) {
+                // The timeout exceeds, getLeftTimeout() will return 0 to indicate we should not wait more
+                leftTimeout_ = 0;
+            }
+        }
+    }
+
+   private:
+    std::atomic_long leftTimeout_;
+    std::chrono::time_point<Clock> before_;
+};
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/CustomLoggerTest.cc b/pulsar-client-cpp/tests/CustomLoggerTest.cc
index 0b4e76adcc4..bd80c312e3b 100644
--- a/pulsar-client-cpp/tests/CustomLoggerTest.cc
+++ b/pulsar-client-cpp/tests/CustomLoggerTest.cc
@@ -20,6 +20,7 @@
 #include <pulsar/ConsoleLoggerFactory.h>
 #include <LogUtils.h>
 #include <gtest/gtest.h>
+#include <atomic>
 #include <thread>
 
 using namespace pulsar;
@@ -28,35 +29,42 @@ static std::vector<std::string> logLines;
 
 class MyTestLogger : public Logger {
    public:
-    MyTestLogger() = default;
+    MyTestLogger(const std::string &fileName) : fileName_(fileName) {}
 
     bool isEnabled(Level level) override { return true; }
 
     void log(Level level, int line, const std::string &message) override {
         std::stringstream ss;
-        ss << " " << level << ":" << line << " " << message << std::endl;
+        ss << std::this_thread::get_id() << " " << level << " " << fileName_ << ":" << line << " " << message
+           << std::endl;
         logLines.emplace_back(ss.str());
     }
+
+   private:
+    const std::string fileName_;
 };
 
 class MyTestLoggerFactory : public LoggerFactory {
    public:
-    Logger *getLogger(const std::string &fileName) override { return logger; }
-
-   private:
-    MyTestLogger *logger = new MyTestLogger;
+    Logger *getLogger(const std::string &fileName) override { return new MyTestLogger(fileName); }
 };
 
 TEST(CustomLoggerTest, testCustomLogger) {
     // simulate new client created on a different thread (because logging factory is called once per thread)
-    auto testThread = std::thread([] {
+    std::atomic_int numLogLines{0};
+    auto testThread = std::thread([&numLogLines] {
         ClientConfiguration clientConfig;
         auto customLogFactory = new MyTestLoggerFactory();
         clientConfig.setLogger(customLogFactory);
         // reset to previous log factory
         Client client("pulsar://localhost:6650", clientConfig);
         client.close();
-        ASSERT_EQ(logLines.size(), 7);
+        ASSERT_TRUE(logLines.size() > 0);
+        for (auto &&line : logLines) {
+            std::cout << line;
+            std::cout.flush();
+        }
+        numLogLines = logLines.size();
         LogUtils::resetLoggerFactory();
     });
     testThread.join();
@@ -65,7 +73,7 @@ TEST(CustomLoggerTest, testCustomLogger) {
     Client client("pulsar://localhost:6650", clientConfig);
     client.close();
     // custom logger didn't get any new lines
-    ASSERT_EQ(logLines.size(), 7);
+    ASSERT_EQ(logLines.size(), numLogLines);
 }
 
 TEST(CustomLoggerTest, testConsoleLoggerFactory) {