You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/17 23:41:05 UTC
[incubator-doris] branch master updated: [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 908f9cb7b9 [Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620)
908f9cb7b9 is described below
commit 908f9cb7b9993ee45ea420e120fd1db160ab02c3
Author: jacktengg <18...@users.noreply.github.com>
AuthorDate: Wed May 18 07:40:57 2022 +0800
[Improvement][ASAN] make BE can exit normally and ASAN memory leak checking work (#9620)
---
be/src/runtime/minidump.cpp | 2 +-
be/src/util/priority_thread_pool.hpp | 19 +++++++++----------
be/src/util/priority_work_stealing_thread_pool.hpp | 19 +++++--------------
3 files changed, 15 insertions(+), 25 deletions(-)
diff --git a/be/src/runtime/minidump.cpp b/be/src/runtime/minidump.cpp
index cf1f0611f6..1c66771403 100644
--- a/be/src/runtime/minidump.cpp
+++ b/be/src/runtime/minidump.cpp
@@ -104,7 +104,7 @@ bool Minidump::_minidump_cb(const google_breakpad::MinidumpDescriptor& descripto
}
void Minidump::stop() {
- if (_stop) {
+ if (config::disable_minidump || _stop) {
return;
}
_stop = true;
diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp
index 20d90b3f41..616cb2475f 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -52,7 +52,6 @@ public:
// -- queue_size: the maximum size of the queue on which work items are offered. If the
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
- // -- work_function: the function to run every time an item is consumed from the queue
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size)
: _work_queue(queue_size), _shutdown(false) {
for (int i = 0; i < num_threads; ++i) {
@@ -118,6 +117,15 @@ public:
protected:
virtual bool is_shutdown() { return _shutdown; }
+ // Collection of worker threads that process work from the queue.
+ ThreadGroup _threads;
+
+ // Guards _empty_cv
+ std::mutex _lock;
+
+ // Signalled when the queue becomes empty
+ std::condition_variable _empty_cv;
+
private:
// Driver method for each thread in the pool. Continues to read work from the queue
// until the pool is shutdown.
@@ -137,17 +145,8 @@ private:
// FIFO order.
BlockingPriorityQueue<Task> _work_queue;
- // Collection of worker threads that process work from the queue.
- ThreadGroup _threads;
-
- // Guards _empty_cv
- std::mutex _lock;
-
// Set to true when threads should stop doing work and terminate.
std::atomic<bool> _shutdown;
-
- // Signalled when the queue becomes empty
- std::condition_variable _empty_cv;
};
} // namespace doris
diff --git a/be/src/util/priority_work_stealing_thread_pool.hpp b/be/src/util/priority_work_stealing_thread_pool.hpp
index c2f717a18f..937012722f 100644
--- a/be/src/util/priority_work_stealing_thread_pool.hpp
+++ b/be/src/util/priority_work_stealing_thread_pool.hpp
@@ -35,7 +35,6 @@ public:
// -- queue_size: the maximum size of the queue on which work items are offered. If the
// queue exceeds this size, subsequent calls to Offer will block until there is
// capacity available.
- // -- work_function: the function to run every time an item is consumed from the queue
PriorityWorkStealingThreadPool(uint32_t num_threads, uint32_t num_queues, uint32_t queue_size)
: PriorityThreadPool(0, 0) {
DCHECK_GT(num_queues, 0);
@@ -50,6 +49,11 @@ public:
}
}
+ virtual ~PriorityWorkStealingThreadPool() {
+ shutdown();
+ join();
+ }
+
// Blocking operation that puts a work item on the queue. If the queue is full, blocks
// until there is capacity available.
//
@@ -79,10 +83,6 @@ public:
}
}
- // Blocks until all threads are finished. shutdown does not need to have been called,
- // since it may be called on a separate thread.
- void join() override { _threads.join_all(); }
-
uint32_t get_queue_size() const override {
uint32_t size = 0;
for (auto work_queue : _work_queues) {
@@ -141,15 +141,6 @@ private:
// Queue on which work items are held until a thread is available to process them in
// FIFO order.
std::vector<std::shared_ptr<BlockingPriorityQueue<Task>>> _work_queues;
-
- // Collection of worker threads that process work from the queues.
- ThreadGroup _threads;
-
- // Guards _empty_cv
- std::mutex _lock;
-
- // Signalled when the queue becomes empty
- std::condition_variable _empty_cv;
};
} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org