You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/09/02 02:01:54 UTC

[incubator-doris] branch master updated: [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5719995  [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781)
5719995 is described below

commit 57199955d6b1f0b8db747876d0b7f4f4bc318235
Author: weizuo93 <we...@apache.org>
AuthorDate: Thu Sep 2 10:01:44 2021 +0800

    [Compaction][ThreadPool]Support adjust compaction threads num at runtime (#5781)
    
    * adjust thread number of compaction thread pool dynamically
    
    Co-authored-by: weizuo <we...@xiaomi.com>
---
 be/src/common/config.h           |  2 +-
 be/src/olap/olap_server.cpp      | 24 ++++++++++++
 be/src/util/threadpool.cpp       | 76 ++++++++++++++++++++++++++++++--------
 be/src/util/threadpool.h         | 28 +++++++++++++-
 be/test/util/threadpool_test.cpp | 79 ++++++++++++++++++++++++++++++++++++++++
 5 files changed, 190 insertions(+), 19 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index eb42572..d31f25a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -280,7 +280,7 @@ CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
 CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min
 
 // This config can be set to limit thread number in compaction thread pool.
-CONF_Int32(max_compaction_threads, "10");
+CONF_mInt32(max_compaction_threads, "10");
 
 // Thread count to do tablet meta checkpoint, -1 means use the data directories count.
 CONF_Int32(max_meta_checkpoint_threads, "-1");
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index bad51dc..851e807 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -333,6 +333,30 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int64_t interval = config::generate_compaction_tasks_min_interval_ms;
     do {
         if (!config::disable_auto_compaction) {
+            VLOG_CRITICAL << "compaction thread pool. num_threads: " << _compaction_thread_pool->num_threads()
+                      << ", num_threads_pending_start: " << _compaction_thread_pool->num_threads_pending_start()
+                      << ", num_active_threads: " << _compaction_thread_pool->num_active_threads()
+                      << ", max_threads: " << _compaction_thread_pool->max_threads()
+                      << ", min_threads: " << _compaction_thread_pool->min_threads()
+                      << ", num_total_queued_tasks: " << _compaction_thread_pool->get_queue_size();
+
+            if(_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
+                int old_max_threads = _compaction_thread_pool->max_threads();
+                Status status = _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
+                if (status.ok()) {
+                    LOG(INFO) << "update compaction thread pool max_threads from "
+                              << old_max_threads << " to " << config::max_compaction_threads;
+                }
+            }
+            if(_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
+                int old_min_threads = _compaction_thread_pool->min_threads();
+                Status status = _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
+                if (status.ok()) {
+                    LOG(INFO) << "update compaction thread pool min_threads from "
+                              << old_min_threads << " to " << config::max_compaction_threads;
+                }
+            }
+
             bool check_score = false;
             int64_t cur_time = UnixMillis();
             if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp
index b06a494..f7f194f 100644
--- a/be/src/util/threadpool.cpp
+++ b/be/src/util/threadpool.cpp
@@ -472,7 +472,7 @@ Status ThreadPool::do_submit(std::shared_ptr<Runnable> r, ThreadPoolToken* token
             }
             // If we failed to create a thread, but there are still some other
             // worker threads, log a warning message and continue.
-            LOG(ERROR) << "Thread pool failed to create thread: " << status.to_string();
+            LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
         }
     }
 
@@ -509,9 +509,6 @@ void ThreadPool::dispatch_thread() {
     DCHECK_GT(_num_threads_pending_start, 0);
     _num_threads++;
     _num_threads_pending_start--;
-    // If we are one of the first '_min_threads' to start, we must be
-    // a "permanent" thread.
-    bool permanent = _num_threads <= _min_threads;
 
     // Owned by this worker thread and added/removed from _idle_threads as needed.
     IdleThread me(&_lock);
@@ -523,6 +520,10 @@ void ThreadPool::dispatch_thread() {
             break;
         }
 
+        if (_num_threads + _num_threads_pending_start > _max_threads) {
+            break;
+        }
+
         if (_queue.empty()) {
             // There's no work to do, let's go idle.
             //
@@ -536,21 +537,17 @@ void ThreadPool::dispatch_thread() {
                     _idle_threads.erase(_idle_threads.iterator_to(me));
                 }
             });
-            if (permanent) {
-                me.not_empty.wait();
-            } else {
-                if (!me.not_empty.wait_for(_idle_timeout)) {
-                    // After much investigation, it appears that pthread condition variables have
-                    // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
-                    // another thread did in fact signal. Apparently after a timeout there is some
-                    // brief period during which another thread may actually grab the internal mutex
-                    // protecting the state, signal, and release again before we get the mutex. So,
-                    // we'll recheck the empty queue case regardless.
-                    if (_queue.empty()) {
+            if (!me.not_empty.wait_for(_idle_timeout)) {
+                // After much investigation, it appears that pthread condition variables have
+                // a weird behavior in which they can return ETIMEDOUT from timed_wait even if
+                // another thread did in fact signal. Apparently after a timeout there is some
+                // brief period during which another thread may actually grab the internal mutex
+                // protecting the state, signal, and release again before we get the mutex. So,
+                // we'll recheck the empty queue case regardless.
+                if (_queue.empty() && _num_threads + _num_threads_pending_start > _min_threads) {
                         VLOG_NOTICE << "Releasing worker thread from pool " << _name << " after "
                                 << _idle_timeout.ToMilliseconds() << "ms of idle time.";
                         break;
-                    }
                 }
             }
             continue;
@@ -645,6 +642,53 @@ void ThreadPool::check_not_pool_thread_unlocked() {
     }
 }
 
+Status ThreadPool::set_min_threads(int min_threads) {
+    MutexLock unique_lock(&_lock);
+    if (min_threads > _max_threads) {
+        // min threads can not be set greater than max threads
+        return Status::InternalError("set thread pool min_threads failed");
+    }
+
+    _min_threads = min_threads;
+    if (min_threads > _num_threads + _num_threads_pending_start) {
+        int addition_threads = min_threads - _num_threads - _num_threads_pending_start;
+        _num_threads_pending_start += addition_threads;
+        for (int i = 0; i < addition_threads; i++) {
+            Status status = create_thread();
+            if (!status.ok()) {
+                _num_threads_pending_start--;
+                LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
+                return status;
+            }
+        }
+    }
+    return Status::OK();
+}
+
+Status ThreadPool::set_max_threads(int max_threads) {
+    MutexLock unique_lock(&_lock);
+    if (_min_threads > max_threads) {
+        // max threads can not be set less than min threads
+        return Status::InternalError("set thread pool max_threads failed");
+    }
+
+    _max_threads = max_threads;
+    if (_max_threads > _num_threads + _num_threads_pending_start) {
+        int addition_threads = _max_threads - _num_threads - _num_threads_pending_start;
+        addition_threads = std::min(addition_threads, _total_queued_tasks);
+        _num_threads_pending_start += addition_threads;
+        for (int i = 0; i < addition_threads; i++) {
+            Status status = create_thread();
+            if (!status.ok()) {
+                _num_threads_pending_start--;
+                LOG(WARNING) << "Thread pool failed to create thread: " << status.to_string();
+                return status;
+            }
+        }
+    }
+    return Status::OK();
+}
+
 std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) {
     return o << ThreadPoolToken::state_to_string(s);
 }
diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h
index 8d2f26e..ca813bf 100644
--- a/be/src/util/threadpool.h
+++ b/be/src/util/threadpool.h
@@ -27,6 +27,7 @@
 #include <unordered_set>
 #include <utility>
 
+#include "common/atomic.h"
 #include "common/status.h"
 #include "gutil/ref_counted.h"
 #include "util/condition_variable.h"
@@ -179,6 +180,9 @@ public:
     // Returns true if the pool reached the idle state, false otherwise.
     bool wait_for(const MonoDelta& delta);
 
+    Status set_min_threads(int min_threads);
+    Status set_max_threads(int max_threads);
+
     // Allocates a new token for use in token-based task submission. All tokens
     // must be destroyed before their ThreadPool is destroyed.
     //
@@ -199,6 +203,26 @@ public:
         return _num_threads + _num_threads_pending_start;
     }
 
+    int max_threads() const {
+        MutexLock l(&_lock);
+        return _max_threads;
+    }
+
+    int min_threads() const {
+        MutexLock l(&_lock);
+        return _min_threads;
+    }
+
+    int num_threads_pending_start() const {
+        MutexLock l(&_lock);
+        return _num_threads_pending_start;
+    }
+
+    int num_active_threads() const {
+        MutexLock l(&_lock);
+        return _active_threads;
+    }
+    
     int get_queue_size() const {
         MutexLock l(&_lock);
         return _total_queued_tasks;
@@ -241,8 +265,8 @@ private:
     void release_token(ThreadPoolToken* t);
 
     const std::string _name;
-    const int _min_threads;
-    const int _max_threads;
+    int _min_threads;
+    int _max_threads;
     const int _max_queue_size;
     const MonoDelta _idle_timeout;
 
diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp
index b8ebebd..ef3d908 100644
--- a/be/test/util/threadpool_test.cpp
+++ b/be/test/util/threadpool_test.cpp
@@ -795,6 +795,85 @@ TEST_F(ThreadPoolTest, TestNormal) {
     ASSERT_EQ(0, token5->num_tasks());
 }
 
+TEST_F(ThreadPoolTest, TestThreadPoolDynamicAdjustMaximumMinimum) {
+    ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName)
+                                                  .set_min_threads(3)
+                                                  .set_max_threads(3)
+                                                  .set_idle_timeout(MonoDelta::FromMilliseconds(1)))
+                            .ok());
+
+    ASSERT_EQ(3, _pool->min_threads());
+    ASSERT_EQ(3, _pool->max_threads());
+    ASSERT_EQ(3, _pool->num_threads());
+
+    ASSERT_TRUE(!_pool->set_min_threads(4).ok());
+    ASSERT_TRUE(!_pool->set_max_threads(2).ok());
+
+    ASSERT_TRUE(_pool->set_min_threads(2).ok());
+    ASSERT_EQ(2, _pool->min_threads());
+    ASSERT_TRUE(_pool->set_max_threads(4).ok());
+    ASSERT_EQ(4, _pool->max_threads());
+
+    ASSERT_TRUE(_pool->set_min_threads(3).ok());
+    ASSERT_EQ(3, _pool->min_threads());
+    ASSERT_TRUE(_pool->set_max_threads(3).ok());
+    ASSERT_EQ(3, _pool->max_threads());
+
+    CountDownLatch latch_1(1);
+    CountDownLatch latch_2(1);
+    CountDownLatch latch_3(1);
+    CountDownLatch latch_4(1);
+    CountDownLatch latch_5(1);
+    CountDownLatch latch_6(1);
+    CountDownLatch latch_7(1);
+    CountDownLatch latch_8(1);
+    CountDownLatch latch_9(1);
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_1)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_2)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_3)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_4)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_5)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_6)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_7)).ok());
+    ASSERT_EQ(3, _pool->num_threads());
+    ASSERT_TRUE(_pool->set_max_threads(4).ok());
+    ASSERT_EQ(4, _pool->max_threads());
+    ASSERT_EQ(4, _pool->num_threads());
+    ASSERT_TRUE(_pool->set_max_threads(5).ok());
+    ASSERT_EQ(5, _pool->max_threads());
+    ASSERT_EQ(5, _pool->num_threads());
+    ASSERT_TRUE(_pool->set_max_threads(6).ok());
+    ASSERT_EQ(6, _pool->max_threads());
+    ASSERT_EQ(6, _pool->num_threads());
+    ASSERT_TRUE(_pool->set_max_threads(4).ok());
+    ASSERT_EQ(4, _pool->max_threads());
+    latch_1.count_down();
+    latch_2.count_down();
+    latch_3.count_down();
+    SleepFor(MonoDelta::FromMilliseconds(500));
+    ASSERT_EQ(4, _pool->num_threads());
+
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_8)).ok());
+    ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch_9)).ok());
+    ASSERT_EQ(4, _pool->num_threads());
+
+    ASSERT_TRUE(_pool->set_min_threads(2).ok());
+    ASSERT_EQ(2, _pool->min_threads());
+
+    latch_4.count_down();
+    latch_5.count_down();
+    latch_6.count_down();
+    latch_7.count_down();
+    latch_8.count_down();
+    latch_9.count_down();
+    SleepFor(MonoDelta::FromMilliseconds(500));
+    ASSERT_EQ(2, _pool->num_threads());
+
+    _pool->wait();
+    _pool->shutdown();
+    ASSERT_EQ(0, _pool->num_threads());
+}
+
 } // namespace doris
 
 int main(int argc, char* argv[]) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org