You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/20 02:53:00 UTC

[doris] 02/03: [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 9a28b26f723f8a8aa6176c00b7b7febe9616d51d
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Wed May 18 07:40:57 2022 +0800

    [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620)
---
 be/src/util/priority_thread_pool.hpp               | 20 ++++++++++----------
 be/src/util/priority_work_stealing_thread_pool.hpp | 19 +++++--------------
 2 files changed, 15 insertions(+), 24 deletions(-)

diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp
index ed0fe90157..db5700137e 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -53,7 +53,6 @@ public:
     //  -- queue_size: the maximum size of the queue on which work items are offered. If the
     //     queue exceeds this size, subsequent calls to Offer will block until there is
     //     capacity available.
-    //  -- work_function: the function to run every time an item is consumed from the queue
     PriorityThreadPool(uint32_t num_threads, uint32_t queue_size)
             : _work_queue(queue_size), _shutdown(false) {
         for (int i = 0; i < num_threads; ++i) {
@@ -117,6 +116,16 @@ public:
     }
 protected:
     virtual bool is_shutdown() { return _shutdown; }
+
+    // Collection of worker threads that process work from the queue.
+    ThreadGroup _threads;
+
+    // Guards _empty_cv
+    std::mutex _lock;
+
+    // Signalled when the queue becomes empty
+    std::condition_variable _empty_cv;
+
 private:
     // Driver method for each thread in the pool. Continues to read work from the queue
     // until the pool is shutdown.
@@ -136,17 +145,8 @@ private:
     // FIFO order.
     BlockingPriorityQueue<Task> _work_queue;
 
-    // Collection of worker threads that process work from the queue.
-    ThreadGroup _threads;
-
-    // Guards _empty_cv
-    std::mutex _lock;
-
     // Set to true when threads should stop doing work and terminate.
     std::atomic<bool> _shutdown;
-
-    // Signalled when the queue becomes empty
-    std::condition_variable _empty_cv;
 };
 
 } // namespace doris
diff --git a/be/src/util/priority_work_stealing_thread_pool.hpp b/be/src/util/priority_work_stealing_thread_pool.hpp
index 3c86098034..c11ee7d198 100644
--- a/be/src/util/priority_work_stealing_thread_pool.hpp
+++ b/be/src/util/priority_work_stealing_thread_pool.hpp
@@ -36,7 +36,6 @@ public:
     //  -- queue_size: the maximum size of the queue on which work items are offered. If the
     //     queue exceeds this size, subsequent calls to Offer will block until there is
     //     capacity available.
-    //  -- work_function: the function to run every time an item is consumed from the queue
     PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size)
             : PriorityThreadPool(0, 0) {
         DCHECK_GT(num_queues, 0);
@@ -51,6 +50,11 @@ public:
         }
     }
 
+    virtual ~PriorityWorkStealingThreadPool() {
+        shutdown();
+        join();
+    }
+
     // Blocking operation that puts a work item on the queue. If the queue is full, blocks
     // until there is capacity available.
     //
@@ -82,10 +86,6 @@ public:
         }
     }
 
-    // Blocks until all threads are finished. shutdown does not need to have been called,
-    // since it may be called on a separate thread.
-    void join() override { _threads.join_all(); }
-
     uint32_t get_queue_size() const override {
         uint32_t size = 0;
         for (auto work_queue : _work_queues) {
@@ -141,15 +141,6 @@ private:
     // Queue on which work items are held until a thread is available to process them in
     // FIFO order.
     std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues;
-
-    // Collection of worker threads that process work from the queues.
-    ThreadGroup _threads;
-
-    // Guards _empty_cv
-    std::mutex _lock;
-
-    // Signalled when the queue becomes empty
-    std::condition_variable _empty_cv;
 };
 
 } // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org