You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/09/02 02:01:54 UTC
[incubator-doris] branch master updated:
[Compaction][ThreadPool]Support adjust compaction threads num at runtime
(#5781)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5719995 [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781)
5719995 is described below
commit 57199955d6b1f0b8db747876d0b7f4f4bc318235
Author: weizuo93 <we...@apache.org>
AuthorDate: Thu Sep 2 10:01:44 2021 +0800
[Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781)
* adjust thread number of compaction thread pool dynamically
Co-authored-by: weizuo <we...@xiaomi.com>
---
be/src/common/config.h | 2 +-
be/src/olap/olap_server.cpp | 24 ++++++++++++
be/src/util/threadpool.cpp | 76 ++++++++++++++++++++++++++++++--------
be/src/util/threadpool.h | 28 +++++++++++++-
be/test/util/threadpool_test.cpp | 79 ++++++++++++++++++++++++++++++++++++++++
5 files changed, 190 insertions(+), 19 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index eb42572..d31f25a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -280,7 +280,7 @@ CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
// This config can be set to limit thread number in compaction thread pool.
-CONF_Int32(max_compaction_threads, "10");
+CONF_mInt32(max_compaction_threads, "10");
// Thread count to do tablet meta checkpoint, -1 means use the data directories count.
CONF_Int32(max_meta_checkpoint_threads, "-1");
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index bad51dc..851e807 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -333,6 +333,30 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t interval = config::generate_compaction_tasks_min_interval_ms;
do {
if (!config::disable_auto_compaction) {
+ VLOG_CRITICAL << "compaction thread pool. num_threads: " << _compaction_thread_pool->num_threads()
+ << ", num_threads_pending_start: " << _compaction_thread_pool->num_threads_pending_start()
+ << ", num_active_threads: " << _compaction_thread_pool->num_active_threads()
+ << ", max_threads: " << _compaction_thread_pool->max_threads()
+ << ", min_threads: " << _compaction_thread_pool->min_threads()
+ << ", num_total_queued_tasks: " << _compaction_thread_pool->get_queue_size();
+
+ if(_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
+ int old_max_threads = _compaction_thread_pool->max_threads();
+ Status status = _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
+ if (status.ok()) {
+ LOG(INFO) << "update compaction thread pool max_threads from "
+ << old_max_threads << " to " << config::max_compaction_threads;
+ }
+ }
+ if(_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
+ int old_min_threads = _compaction_thread_pool->min_threads();
+ Status status = _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
+ if (status.ok()) {
+ LOG(INFO) << "update compaction thread pool min_threads from "
+ << old_min_threads << " to " << config::max_compaction_threads;
+ }
+ }
+
bool check_score = false;
int64_t cur_time = UnixMillis();
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index b06a494..f7f194f 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -472,7 +472,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
}
// If we failed to create a thread, but there are still some other
// worker threads, log a warning message and continue.
- LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string();
+ LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
}
}
@@ -509,9 +509,6 @@ void ThreadPool::dispatch_thread() {
DCHECK_GT(_num_threads_pending_start, 0);
_num_threads++;
_num_threads_pending_start--;
- // If we are one of the first '_min_threads' to start, we must be
- // a "permanent" thread.
- bool permanent = _num_threads <= _min_threads;
// Owned by this worker thread and added/removed from _idle_threads as needed.
IdleThread me(&_lock);
@@ -523,6 +520,10 @@ void ThreadPool::dispatch_thread() {
break;
}
+ if (_num_threads + _num_threads_pending_start > _max_threads) {
+ break;
+ }
+
if (_queue.empty()) {
// There's no work to do, let's go idle.
//
@@ -536,21 +537,17 @@ void ThreadPool::dispatch_thread() {
_idle_threads.erase(_idle_threads.iterator_to(me));
}
});
- if (permanent) {
- me.not_empty.wait();
- } else {
- if (!me.not_empty.wait_for(_idle_timeout)) {
- // After much investigation, it appears that pthread condition variables have
- // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
- // another thread did in fact signal. Apparently after a timeout there is some
- // brief period during which another thread may actually grab the internal mutex
- // protecting the state, signal, and release again before we get the mutex. So,
- // we'll recheck the empty queue case regardless.
- if (_queue.empty()) {
+ if (!me.not_empty.wait_for(_idle_timeout)) {
+ // After much investigation, it appears that pthread condition variables have
+ // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
+ // another thread did in fact signal. Apparently after a timeout there is some
+ // brief period during which another thread may actually grab the internal mutex
+ // protecting the state, signal, and release again before we get the mutex. So,
+ // we'll recheck the empty queue case regardless.
+ if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
<< _idle_timeout.ToMilliseconds() << "ms of idle time.";
break;
- }
}
}
continue;
@@ -645,6 +642,53 @@ void ThreadPool::check_not_pool_thread_unlocked() {
}
}
+Status ThreadPool::set_min_threads(int min_threads) {
+ MutexLock unique_lock(&_lock);
+ if (min_threads > _max_threads) {
+ // min threads can not be set greater than max threads
+ return Status::InternalError("set thread pool min_threads failed");
+ }
+
+ _min_threads = min_threads;
+ if (min_threads > _num_threads + _num_threads_pending_start) {
+ int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
+ _num_threads_pending_start += addition_threads;
+ for (int i = 0; i < addition_threads; i++) {
+ Status status = create_thread();
+ if (!status.ok()) {
+ _num_threads_pending_start--;
+ LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
+ return status;
+ }
+ }
+ }
+ return Status::OK();
+}
+
+Status ThreadPool::set_max_threads(int max_threads) {
+ MutexLock unique_lock(&_lock);
+ if (_min_threads > max_threads) {
+ // max threads can not be set less than min threads
+ return Status::InternalError("set thread pool max_threads failed");
+ }
+
+ _max_threads = max_threads;
+ if (_max_threads > _num_threads + _num_threads_pending_start) {
+ int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
+ addition_threads = std::min(addition_threads, _total_queued_tasks);
+ _num_threads_pending_start += addition_threads;
+ for (int i = 0; i < addition_threads; i++) {
+ Status status = create_thread();
+ if (!status.ok()) {
+ _num_threads_pending_start--;
+ LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
+ return status;
+ }
+ }
+ }
+ return Status::OK();
+}
+
std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
return o << ThreadPoolToken::state_to_string(s);
}
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 8d2f26e..ca813bf 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -27,6 +27,7 @@
#include <unordered_set>
#include <utility>
+#include "common/atomic.h"
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "util/condition_variable.h"
@@ -179,6 +180,9 @@ public:
// Returns true if the pool reached the idle state, false otherwise.
bool wait_for(const MonoDelta& delta);
+ Status set_min_threads(int min_threads);
+ Status set_max_threads(int max_threads);
+
// Allocates a new token for use in token-based task submission. All tokens
// must be destroyed before their ThreadPool is destroyed.
//
@@ -199,6 +203,26 @@ public:
return _num_threads + _num_threads_pending_start;
}
+ int max_threads() const {
+ MutexLock l(&_lock);
+ return _max_threads;
+ }
+
+ int min_threads() const {
+ MutexLock l(&_lock);
+ return _min_threads;
+ }
+
+ int num_threads_pending_start() const {
+ MutexLock l(&_lock);
+ return _num_threads_pending_start;
+ }
+
+ int num_active_threads() const {
+ MutexLock l(&_lock);
+ return _active_threads;
+ }
+
int get_queue_size() const {
MutexLock l(&_lock);
return _total_queued_tasks;
@@ -241,8 +265,8 @@ private:
void release_token(ThreadPoolToken* t);
const std::string _name;
- const int _min_threads;
- const int _max_threads;
+ int _min_threads;
+ int _max_threads;
const int _max_queue_size;
const MonoDelta _idle_timeout;
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index b8ebebd..ef3d908 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -795,6 +795,85 @@ TEST_F(ThreadPoolTest, TestNormal) {
ASSERT_EQ(0, token5->num_tasks());
}
+TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) {
+ ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
+ .set_min_threads(3)
+ .set_max_threads(3)
+ .set_idle_timeout(MonoDelta::FromMilliseconds(1)))
+ .ok());
+
+ ASSERT_EQ(3, _pool->min_threads());
+ ASSERT_EQ(3, _pool->max_threads());
+ ASSERT_EQ(3, _pool->num_threads());
+
+ ASSERT_TRUE(!_pool->set_min_threads(4).ok());
+ ASSERT_TRUE(!_pool->set_max_threads(2).ok());
+
+ ASSERT_TRUE(_pool->set_min_threads(2).ok());
+ ASSERT_EQ(2, _pool->min_threads());
+ ASSERT_TRUE(_pool->set_max_threads(4).ok());
+ ASSERT_EQ(4, _pool->max_threads());
+
+ ASSERT_TRUE(_pool->set_min_threads(3).ok());
+ ASSERT_EQ(3, _pool->min_threads());
+ ASSERT_TRUE(_pool->set_max_threads(3).ok());
+ ASSERT_EQ(3, _pool->max_threads());
+
+ CountDownLatch latch_1(1);
+ CountDownLatch latch_2(1);
+ CountDownLatch latch_3(1);
+ CountDownLatch latch_4(1);
+ CountDownLatch latch_5(1);
+ CountDownLatch latch_6(1);
+ CountDownLatch latch_7(1);
+ CountDownLatch latch_8(1);
+ CountDownLatch latch_9(1);
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_1)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_2)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_3)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_4)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_5)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_6)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_7)).ok());
+ ASSERT_EQ(3, _pool->num_threads());
+ ASSERT_TRUE(_pool->set_max_threads(4).ok());
+ ASSERT_EQ(4, _pool->max_threads());
+ ASSERT_EQ(4, _pool->num_threads());
+ ASSERT_TRUE(_pool->set_max_threads(5).ok());
+ ASSERT_EQ(5, _pool->max_threads());
+ ASSERT_EQ(5, _pool->num_threads());
+ ASSERT_TRUE(_pool->set_max_threads(6).ok());
+ ASSERT_EQ(6, _pool->max_threads());
+ ASSERT_EQ(6, _pool->num_threads());
+ ASSERT_TRUE(_pool->set_max_threads(4).ok());
+ ASSERT_EQ(4, _pool->max_threads());
+ latch_1.count_down();
+ latch_2.count_down();
+ latch_3.count_down();
+ SleepFor(MonoDelta::FromMilliseconds(500));
+ ASSERT_EQ(4, _pool->num_threads());
+
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_8)).ok());
+ ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_9)).ok());
+ ASSERT_EQ(4, _pool->num_threads());
+
+ ASSERT_TRUE(_pool->set_min_threads(2).ok());
+ ASSERT_EQ(2, _pool->min_threads());
+
+ latch_4.count_down();
+ latch_5.count_down();
+ latch_6.count_down();
+ latch_7.count_down();
+ latch_8.count_down();
+ latch_9.count_down();
+ SleepFor(MonoDelta::FromMilliseconds(500));
+ ASSERT_EQ(2, _pool->num_threads());
+
+ _pool->wait();
+ _pool->shutdown();
+ ASSERT_EQ(0, _pool->num_threads());
+}
+
} // namespace doris
int main(int argc, char* argv[]) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org