You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/02/18 10:39:15 UTC

[incubator-doris] branch master updated: Use ThreadPool to refactor MemTableFlushExecutor (#2931)

This is an automated email from the ASF dual-hosted git repository.

lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1cf0fb9  Use ThreadPool to refactor MemTableFlushExecutor (#2931)
1cf0fb9 is described below

commit 1cf0fb9117a38f94c88d4ca4be9daed6f690af1a
Author: lichaoyong <li...@baidu.com>
AuthorDate: Tue Feb 18 18:39:04 2020 +0800

    Use ThreadPool to refactor MemTableFlushExecutor (#2931)
    
    1. MemTableFlushExecutor maintain a ThreadPool to receive FlushTask.
    2. FlushToken is used to seperate different tasks from different tablets.
       Every DeltaWriter of tablet constructs a FlushToken,
       task in FlushToken are handle serially, task between FlushToken are
       handle concurrently.
    3. I have remove thread limit on data_dir, because of I/O is not the main
       timer consumer of Flush thread. Much of time is consumed in CPU decoding
       and compress.
---
 be/src/exec/olap_scan_node.cpp                     |   1 -
 be/src/olap/delta_writer.cpp                       |  21 ++-
 be/src/olap/delta_writer.h                         |   4 +-
 be/src/olap/memtable_flush_executor.cpp            | 162 +++++----------------
 be/src/olap/memtable_flush_executor.h              | 131 ++++-------------
 be/src/runtime/dpp_sink.cpp                        |   2 +-
 be/src/runtime/exec_env.h                          |   5 +-
 be/src/runtime/exec_env_init.cpp                   |   3 +-
 be/src/runtime/fragment_mgr.cpp                    |   4 +-
 be/src/runtime/fragment_mgr.h                      |   4 +-
 be/src/runtime/routine_load/data_consumer_group.h  |   4 +-
 .../routine_load/routine_load_task_executor.h      |   4 +-
 be/src/runtime/runtime_state.h                     |   1 -
 be/src/runtime/tablets_channel.h                   |   2 +-
 be/src/service/internal_service.h                  |   4 +-
 be/src/util/priority_thread_pool.hpp               |   8 +-
 be/src/util/thread_pool.hpp                        | 154 --------------------
 be/test/olap/skiplist_test.cpp                     |   4 +-
 be/test/runtime/load_channel_mgr_test.cpp          |   1 +
 19 files changed, 102 insertions(+), 417 deletions(-)

diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index cc3ebd9..db769c6 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -36,7 +36,6 @@
 #include "runtime/string_value.h"
 #include "runtime/tuple_row.h"
 #include "util/runtime_profile.h"
-#include "util/thread_pool.hpp"
 #include "util/debug_util.h"
 #include "util/priority_thread_pool.hpp"
 #include "agent/cgroups_mgr.h"
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 1aea7d6..0a89bc4 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -50,10 +50,9 @@ DeltaWriter::~DeltaWriter() {
         return;
     }
 
-    if (_flush_handler != nullptr) {
+    if (_flush_token != nullptr) {
         // cancel and wait all memtables in flush queue to be finished
-        _flush_handler->cancel();
-        _flush_handler->wait();
+        _flush_token->cancel();
     }
 
     if (_tablet != nullptr) {
@@ -150,8 +149,7 @@ OLAPStatus DeltaWriter::init() {
     _reset_mem_table();
 
     // create flush handler
-    RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(
-            _tablet->data_dir()->path_hash(), &_flush_handler));
+    RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token));
 
     _is_init = true;
     return OLAP_SUCCESS;
@@ -175,7 +173,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
 }
 
 OLAPStatus DeltaWriter::_flush_memtable_async() {
-    return _flush_handler->submit(_mem_table);
+    return _flush_token->submit(_mem_table);
 }
 
 OLAPStatus DeltaWriter::flush_memtable_and_wait() {
@@ -190,7 +188,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
         // this means there should be at least one memtable in flush queue.
     }
     // wait all memtables in flush queue to be flushed.
-    RETURN_NOT_OK(_flush_handler->wait());
+    RETURN_NOT_OK(_flush_token->wait());
     return OLAP_SUCCESS;
 }
 
@@ -218,7 +216,7 @@ OLAPStatus DeltaWriter::close() {
 OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
     DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called";
     // return error if previous flush failed
-    RETURN_NOT_OK(_flush_handler->wait());
+    RETURN_NOT_OK(_flush_token->wait());
     DCHECK_EQ(_mem_tracker->consumption(), 0);
 
     // use rowset meta manager to save meta
@@ -268,7 +266,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
 
     _delta_written_success = true;
 
-    const FlushStatistic& stat = _flush_handler->get_stats();
+    const FlushStatistic& stat = _flush_token->get_stats();
     LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat;
     return OLAP_SUCCESS;
 }
@@ -278,10 +276,9 @@ OLAPStatus DeltaWriter::cancel() {
         return OLAP_SUCCESS;
     }
     _mem_table.reset();
-    if (_flush_handler != nullptr) {
+    if (_flush_token != nullptr) {
         // cancel and wait all memtables in flush queue to be finished
-        _flush_handler->cancel();
-        _flush_handler->wait();
+        _flush_token->cancel();
     }
     DCHECK_EQ(_mem_tracker->consumption(), 0);
     return OLAP_SUCCESS;
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index dbf1326..64828d5 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -24,7 +24,7 @@
 
 namespace doris {
 
-class FlushHandler;
+class FlushToken;
 class MemTable;
 class MemTracker;
 class Schema;
@@ -106,7 +106,7 @@ private:
     bool _delta_written_success;
 
     StorageEngine* _storage_engine;
-    std::shared_ptr<FlushHandler> _flush_handler;
+    std::unique_ptr<FlushToken> _flush_token;
     std::unique_ptr<MemTracker> _mem_tracker;
 };
 
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 51d5f23..00bf34e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -24,146 +24,62 @@
 #include "olap/memtable.h"
 #include "runtime/exec_env.h"
 #include "runtime/mem_tracker.h"
+#include "util/scoped_cleanup.h"
 
 namespace doris {
 
-OLAPStatus FlushHandler::submit(std::shared_ptr<MemTable> memtable) {
-    RETURN_NOT_OK(_last_flush_status.load());
-    MemTableFlushContext ctx;
-    ctx.memtable = std::move(memtable);
-    ctx.flush_handler = this->shared_from_this();
-    _counter_cond.inc();
-    VLOG(5) << "submitting " << *(ctx.memtable) << " to flush queue " << _flush_queue_idx;
-    RETURN_NOT_OK(_flush_executor->_push_memtable(_flush_queue_idx, ctx));
-    return OLAP_SUCCESS;
-}
-
-OLAPStatus FlushHandler::wait() {
-    // wait all submitted tasks to be finished or cancelled
-    _counter_cond.block_wait();
-    return _last_flush_status.load();
-}
-
-void FlushHandler::on_flush_finished(const FlushResult& res) {
-    if (res.flush_status != OLAP_SUCCESS) {
-        _last_flush_status.store(res.flush_status);
-    } else {
-        _stats.flush_time_ns.fetch_add(res.flush_time_ns);
-        _stats.flush_count.fetch_add(1);
-    }
-    _counter_cond.dec();
+std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
+    os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
+       << ", flush count=" << stat.flush_count << ")";
+    return os;
 }
 
-OLAPStatus MemTableFlushExecutor::create_flush_handler(
-        size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) {
-    size_t flush_queue_idx = _get_queue_idx(path_hash);
-    flush_handler->reset(new FlushHandler(flush_queue_idx, this));
+OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) {
+    _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable));
     return OLAP_SUCCESS;
 }
 
-void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
-    int32_t data_dir_num = data_dirs.size();
-    _thread_num_per_store = std::max(1, config::flush_thread_num_per_store);
-    _num_threads = data_dir_num * _thread_num_per_store;
-
-    // create flush queues
-    for (int i = 0; i < _num_threads; ++i) {
-        BlockingQueue<MemTableFlushContext>* queue = new BlockingQueue<MemTableFlushContext>(10);
-        _flush_queues.push_back(queue);
-    }
-    // create thread pool
-    _flush_pool = new ThreadPool(_num_threads, 1);
-    for (int32_t i = 0; i < _num_threads; ++i) {
-       _flush_pool->offer(std::bind<void>(&MemTableFlushExecutor::_flush_memtable, this, i));
-    }
-
-    // _path_map saves the path hash to current idx of flush queue.
-    // eg.
-    // there are 4 data stores, each store has 2 work thread.
-    // so there are 8(= 4 * 2) queues in _flush_queues.
-    // and the path hash of the 4 paths are mapped to idx 0, 2, 4, 6.
-    int32_t group = 0;
-    for (auto store : data_dirs) {
-        _path_map[store->path_hash()] = group;
-        group += _thread_num_per_store;
-    }
+void FlushToken::cancel() { 
+    _flush_token->shutdown();
 }
 
-MemTableFlushExecutor::~MemTableFlushExecutor() {
-    // shutdown queues
-    for (auto queue : _flush_queues) {
-        queue->shutdown();
-    }
-
-    // shutdown thread pool
-    _flush_pool->shutdown();
-    _flush_pool->join();
-
-    // delete queue
-    for (auto queue : _flush_queues) {
-        delete queue;
-    }
-    _flush_queues.clear();
-
-    delete _flush_pool;
+OLAPStatus FlushToken::wait() { 
+    _flush_token->wait();
+    return _flush_status;
 }
 
-size_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) {
-    std::lock_guard<SpinLock> l(_lock);
-    size_t cur_idx = _path_map[path_hash];
-    size_t group = cur_idx / _thread_num_per_store;
-    size_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store);
-    DCHECK(next_idx < _num_threads);
-    _path_map[path_hash] = next_idx;
-    return cur_idx;
-}
-
-OLAPStatus MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushContext& ctx) {
-    if (!_flush_queues[queue_idx]->blocking_put(ctx)) {
-        return OLAP_ERR_OTHER_ERROR;
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
+    MonotonicStopWatch timer;
+    timer.start();
+    _flush_status = memtable->flush();
+    SCOPED_CLEANUP({
+        memtable.reset();
+    });
+    if (_flush_status != OLAP_SUCCESS) {
+        return;
     }
 
-    return OLAP_SUCCESS;
+    _stats.flush_time_ns += timer.elapsed_time();
+    _stats.flush_count++;
+    _stats.flush_size_bytes += memtable->memory_usage();
+    LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000
+              << " ms, status=" << _flush_status;
 }
 
-void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) {
-    while (true) {
-        MemTableFlushContext ctx;
-        if (!_flush_queues[queue_idx]->blocking_get(&ctx)) {
-            // queue is empty and shutdown, end of thread
-            return;
-        }
-
-        // if last flush of this tablet already failed, just skip
-        if (ctx.flush_handler->is_cancelled()) {
-            VLOG(5) << "skip flushing " << *(ctx.memtable) << " due to cancellation";
-            // must release memtable before notifying
-            ctx.memtable.reset();
-            ctx.flush_handler->on_flush_cancelled();
-            continue;
-        }
-
-        // flush the memtable
-        VLOG(5) << "begin to flush " << *(ctx.memtable);
-        FlushResult res;
-        MonotonicStopWatch timer;
-        timer.start();
-        res.flush_status = ctx.memtable->flush();
-        res.flush_time_ns = timer.elapsed_time();
-        res.flush_size_bytes = ctx.memtable->memory_usage();
-        VLOG(5) << "flushed " << *(ctx.memtable) << " in " << res.flush_time_ns / 1000 / 1000
-                << " ms, status=" << res.flush_status;
-        // must release memtable before notifying
-        ctx.memtable.reset();
-        // callback
-        ctx.flush_handler->on_flush_finished(res);
-    }
+void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
+    int32_t data_dir_num = data_dirs.size();
+    size_t min_threads = std::max(1, config::flush_thread_num_per_store);
+    size_t max_threads = data_dir_num * min_threads;
+    ThreadPoolBuilder("MemTableFlushThreadPool")
+        .set_min_threads(min_threads)
+        .set_max_threads(max_threads)
+        .build(&_flush_pool);
 }
 
-std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
-    os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
-       << ", flush count=" << stat.flush_count << ")";
-    return os;
+// create a flush token
+OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
+    flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+    return OLAP_SUCCESS;
 }
 
-} // end of namespac
+} // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index ef84e29..bb58e3f 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,19 +17,13 @@
 
 #pragma once
 
-#include <atomic>
 #include <cstdint>
 #include <memory>
-#include <queue>
 #include <vector>
-#include <unordered_map>
 #include <utility>
 
-#include "util/blocking_queue.hpp"
-#include "util/counter_cond_variable.hpp"
-#include "util/spinlock.h"
-#include "util/thread_pool.hpp"
 #include "olap/olap_define.h"
+#include "util/threadpool.h"
 
 namespace doris {
 
@@ -38,132 +32,65 @@ class DeltaWriter;
 class ExecEnv;
 class MemTable;
 
-// The context for a memtable to be flushed.
-class FlushHandler;
-struct MemTableFlushContext {
-    // memtable to be flushed
-    std::shared_ptr<MemTable> memtable;
-    // flush handler from a delta writer.
-    // use shared ptr because flush_handler may be deleted before this
-    // memtable being flushed. so we need to make sure the flush_handler
-    // is alive until this memtable being flushed.
-    std::shared_ptr<FlushHandler> flush_handler;
-};
-
-// the flush result of a single memtable flush
-struct FlushResult {
-    OLAPStatus flush_status;
-    int64_t flush_time_ns = 0;
-    int64_t flush_size_bytes = 0;
-};
-
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
 struct FlushStatistic {
-    std::atomic<std::int64_t> flush_time_ns = {0};
-    std::atomic<std::int64_t> flush_count= {0};
+    int64_t flush_time_ns = 0;
+    int64_t flush_count= 0;
+    int64_t flush_size_bytes = 0;
 };
 
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
 
-class MemTableFlushExecutor;
-
-// flush handler is for flushing memtables in a delta writer
-// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception
-// when calling submit();
-class FlushHandler : public std::enable_shared_from_this<FlushHandler> {
+// A thin wrapper of ThreadPoolToken to submit task.
+class FlushToken {
 public:
-    FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor) :
-            _flush_queue_idx(flush_queue_idx),
-            _last_flush_status(OLAP_SUCCESS),
-            _counter_cond(0),
-            _flush_executor(flush_executor),
-            _is_cancelled(false) {
-    }
-
-    // submit a memtable to flush. return error if some previous submitted MemTable has failed
-    OLAPStatus submit(std::shared_ptr<MemTable> memtable);
-    // wait for all memtables submitted by itself to be finished.
+    explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
+        : _flush_status(OLAP_SUCCESS),
+          _flush_token(std::move(flush_pool_token)) {}
+
+    OLAPStatus submit(std::shared_ptr<MemTable> mem_table);
+
+    // error has happpens, so we cancel this token
+    // And remove all tasks in the queue.
+    void cancel();
+
+    // wait all tasks in token to be completed.
     OLAPStatus wait();
+
     // get flush operations' statistics
     const FlushStatistic& get_stats() const { return _stats; }
 
-    bool is_cancelled() {
-        return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load();
-    }
-    void cancel() { _is_cancelled.store(true); }
-
-    // These on_xxx() methods are callback when flush finishes or cancels, user should
-    // not call them directly.
-    // called when a memtable is finished by executor.
-    void on_flush_finished(const FlushResult& res);
-    // called when a flush memtable execution is cancelled
-    void on_flush_cancelled() {
-        _counter_cond.dec();
-    }
-
 private:
-    // flush queue idx in memtable flush executor
-    int32_t _flush_queue_idx;
-    // the flush status of last memtable
-    std::atomic<OLAPStatus> _last_flush_status;
-    // used to wait/notify the memtable flush execution
-    CounterCondVariable _counter_cond;
+    void _flush_memtable(std::shared_ptr<MemTable> mem_table);
 
+    OLAPStatus _flush_status;
+    std::unique_ptr<ThreadPoolToken> _flush_token;
     FlushStatistic _stats;
-    MemTableFlushExecutor* _flush_executor;
-
-    // the caller of the flush handler can set this variable to notify that the
-    // uppper application is already cancelled.
-    std::atomic<bool> _is_cancelled;
 };
 
 // MemTableFlushExecutor is responsible for flushing memtables to disk.
-// Each data directory has a specified number of worker threads and each thread will correspond
-// to a queue. The only job of each worker thread is to take memtable from its corresponding
-// flush queue and writes the data to disk.
-//
-// NOTE: User SHOULD NOT call method of this class directly, use pattern should be:
+// It encapsulate a ThreadPool to handle all tasks.
+// Usage Example:
 //      ...
 //      std::shared_ptr<FlushHandler> flush_handler;
-//      memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler);
+//      memTableFlushExecutor.create_flush_token(path_hash, &flush_handler);
 //      ...
-//      flush_handler->submit(memtable)
+//      flush_token->submit(memtable)
 //      ...
 class MemTableFlushExecutor {
 public:
     MemTableFlushExecutor() {}
-    ~MemTableFlushExecutor();
+    ~MemTableFlushExecutor() {}
 
     // init should be called after storage engine is opened,
     // because it needs path hash of each data dir.
     void init(const std::vector<DataDir*>& data_dirs);
 
-    // create a flush handler to access the flush executor
-    OLAPStatus create_flush_handler(size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler);
+    OLAPStatus create_flush_token(std::unique_ptr<FlushToken>* flush_token);
+
 private:
-    friend class FlushHandler;
-
-    // given the path hash, return the next idx of flush queue.
-    // eg.
-    // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called,
-    // 0 and 1 will returned alternately.
-    size_t _get_queue_idx(size_t path_hash);
-
-    // push the memtable to specified flush queue
-    OLAPStatus _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx);
-
-    void _flush_memtable(int32_t queue_idx);
-
-    int32_t _thread_num_per_store;
-    int32_t _num_threads;
-    ThreadPool* _flush_pool;
-    // the size of this vector should equal to _num_threads
-    std::vector<BlockingQueue<MemTableFlushContext>*> _flush_queues;
-    // lock to protect _path_map
-    SpinLock _lock;
-    // path hash -> queue idx of _flush_queues;
-    std::unordered_map<size_t, size_t> _path_map;
+    std::unique_ptr<ThreadPool> _flush_pool;
 };
 
 } // end namespace
diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp
index 43d3a0b..bea1c96 100644
--- a/be/src/runtime/dpp_sink.cpp
+++ b/be/src/runtime/dpp_sink.cpp
@@ -35,7 +35,7 @@
 #include "gen_cpp/Types_types.h"
 #include "util/countdown_latch.h"
 #include "util/debug_util.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 #include "olap/field.h"
 
 namespace doris {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 125f039..076b904 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -47,7 +47,6 @@ class ResultQueueMgr;
 class TMasterInfo;
 class LoadChannelMgr;
 class TestExecEnv;
-class ThreadPool;
 class ThreadResourceMgr;
 class TmpFileMgr;
 class WebPageHandler;
@@ -106,7 +105,7 @@ public:
     PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
     ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
     PriorityThreadPool* thread_pool() { return _thread_pool; }
-    ThreadPool* etl_thread_pool() { return _etl_thread_pool; }
+    PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
     CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
     FragmentMgr* fragment_mgr() { return _fragment_mgr; }
     TMasterInfo* master_info() { return _master_info; }
@@ -157,7 +156,7 @@ private:
     PoolMemTrackerRegistry* _pool_mem_trackers = nullptr;
     ThreadResourceMgr* _thread_mgr = nullptr;
     PriorityThreadPool* _thread_pool = nullptr;
-    ThreadPool* _etl_thread_pool = nullptr;
+    PriorityThreadPool* _etl_thread_pool = nullptr;
     CgroupsMgr* _cgroups_mgr = nullptr;
     FragmentMgr* _fragment_mgr = nullptr;
     TMasterInfo* _master_info = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8b4a28c..47cc38b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -55,7 +55,6 @@
 #include "util/brpc_stub_cache.h"
 #include "util/priority_thread_pool.hpp"
 #include "agent/cgroups_mgr.h"
-#include "util/thread_pool.hpp"
 #include "gen_cpp/BackendService.h"
 #include "gen_cpp/FrontendService.h"
 #include "gen_cpp/TPaloBrokerService.h"
@@ -86,7 +85,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
     _thread_pool = new PriorityThreadPool(
         config::doris_scanner_thread_pool_thread_num,
         config::doris_scanner_thread_pool_queue_size);
-    _etl_thread_pool = new ThreadPool(
+    _etl_thread_pool = new PriorityThreadPool(
         config::etl_thread_pool_size,
         config::etl_thread_pool_queue_size);
     _cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a60dc47..fec4c5d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -411,7 +411,7 @@ Status FragmentMgr::exec_plan_fragment(
 }
 
 static void* fragment_executor(void* param) {
-    ThreadPool::WorkFunction* func = (ThreadPool::WorkFunction*)param;
+    PriorityThreadPool::WorkFunction* func = (PriorityThreadPool::WorkFunction*)param;
     (*func)();
     delete func;
     return nullptr;
@@ -469,7 +469,7 @@ Status FragmentMgr::exec_plan_fragment(
         int ret = pthread_create(&id,
                        nullptr,
                        fragment_executor,
-                       new ThreadPool::WorkFunction(
+                       new PriorityThreadPool::WorkFunction(
                            std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb)));
         if (ret != 0) {
             std::string err_msg("Could not create thread.");
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 5eb04db..74b25c4 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -29,7 +29,7 @@
 #include "gen_cpp/DorisExternalService_types.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 #include "util/hash_util.hpp"
 #include "http/rest_monitor_iface.h"
 
@@ -90,7 +90,7 @@ private:
     bool _stop;
     std::thread _cancel_thread;
     // every job is a pool
-    ThreadPool _thread_pool;
+    PriorityThreadPool _thread_pool;
 
 };
 
diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h
index 52b756b..9083835 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -19,7 +19,7 @@
 
 #include "runtime/routine_load/data_consumer.h"
 #include "util/blocking_queue.hpp"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 
 namespace doris {
 
@@ -59,7 +59,7 @@ protected:
     UniqueId _grp_id;
     std::vector<std::shared_ptr<DataConsumer>> _consumers;
     // thread pool to run each consumer in multi thread
-    ThreadPool _thread_pool;
+    PriorityThreadPool _thread_pool;
     // mutex to protect counter.
     // the counter is init as the number of consumers.
     // once a consumer is done, decrease the counter.
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h
index 2bc88f4..3700e80 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -22,7 +22,7 @@
 #include <mutex>
 
 #include "runtime/routine_load/data_consumer_pool.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
 
 #include "gen_cpp/internal_service.pb.h"
@@ -74,7 +74,7 @@ private:
 
 private:
     ExecEnv* _exec_env;
-    ThreadPool _thread_pool;
+    PriorityThreadPool _thread_pool;
     DataConsumerPool _data_consumer_pool;
 
     std::mutex _lock;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 9589035..688e7fb 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -51,7 +51,6 @@ class DateTimeValue;
 class MemTracker;
 class DataStreamRecvr;
 class ResultBufferMgr;
-class ThreadPool;
 class DiskIoMgrs;
 class TmpFileMgr;
 class BufferedBlockMgr;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 25c6cb1..cb865cf 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -23,7 +23,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "util/bitmap.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 #include "util/uid_util.h"
 
 #include "gen_cpp/Types_types.h"
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 957bb79..e27c2c3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -20,7 +20,7 @@
 #include "common/status.h"
 #include "gen_cpp/internal_service.pb.h"
 #include "gen_cpp/palo_internal_service.pb.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 
 namespace brpc {
 class Controller;
@@ -90,7 +90,7 @@ private:
     Status _exec_plan_fragment(brpc::Controller* cntl);
 private:
     ExecEnv* _exec_env;
-    ThreadPool _tablet_worker_pool;
+    PriorityThreadPool _tablet_worker_pool;
 };
 
 }
diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp
index faa00ba..68f50d7 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -56,7 +56,6 @@ public:
     //     capacity available.
     //  -- work_function: the function to run every time an item is consumed from the queue
     PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) :
-            _thread_num(num_threads),
             _work_queue(queue_size),
             _shutdown(false) {
         for (int i = 0; i < num_threads; ++i) {
@@ -88,6 +87,11 @@ public:
         return _work_queue.blocking_put(task);
     }
 
+    bool offer(WorkFunction func) {
+        PriorityThreadPool::Task task = {0, func};
+        return _work_queue.blocking_put(task);
+    }
+
     // Shuts the thread pool down, causing the work queue to cease accepting offered work
     // and the worker threads to terminate once they have processed their current work item.
     // Returns once the shutdown flag has been set, does not wait for the threads to
@@ -145,8 +149,6 @@ private:
         return _shutdown;
     }
 
-    uint32_t _thread_num;
-
     // Queue on which work items are held until a thread is available to process them in
     // FIFO order.
     BlockingPriorityQueue<Task> _work_queue;
diff --git a/be/src/util/thread_pool.hpp b/be/src/util/thread_pool.hpp
deleted file mode 100644
index 9aa1feb..0000000
--- a/be/src/util/thread_pool.hpp
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP
-#define DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP
-
-#include "util/blocking_queue.hpp"
-
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind/mem_fn.hpp>
-
-namespace doris {
-
-// Simple threadpool which processes items (of type T) in parallel which were placed on a
-// blocking queue by Offer(). Each item is processed by a single user-supplied method.
-class ThreadPool {
-public:
-    // Signature of a work-processing function. Takes the integer id of the thread which is
-    // calling it (ids run from 0 to num_threads - 1) and a reference to the item to
-    // process.
-    typedef boost::function<void ()> WorkFunction;
-
-    // Creates a new thread pool and start num_threads threads.
-    //  -- num_threads: how many threads are part of this pool
-    //  -- queue_size: the maximum size of the queue on which work items are offered. If the
-    //     queue exceeds this size, subsequent calls to Offer will block until there is
-    //     capacity available.
-    //  -- work_function: the function to run every time an item is consumed from the queue
-    ThreadPool(uint32_t num_threads, uint32_t queue_size) : 
-            _work_queue(queue_size),
-            _shutdown(false) {
-        for (int i = 0; i < num_threads; ++i) {
-            _threads.create_thread(
-                    boost::bind<void>(boost::mem_fn(&ThreadPool::work_thread), this, i));
-        }
-    }
-
-    // Destructor ensures that all threads are terminated before this object is freed
-    // otherwise they may continue to run and reference member variables
-    ~ThreadPool() {
-        shutdown();
-        join();
-    }
-
-    // Blocking operation that puts a work item on the queue. If the queue is full, blocks
-    // until there is capacity available.
-    //
-    // 'work' is copied into the work queue, but may be referenced at any time in the
-    // future. Therefore the caller needs to ensure that any data referenced by work (if T
-    // is, e.g., a pointer type) remains valid until work has been processed, and it's up to
-    // the caller to provide their own signalling mechanism to detect this (or to wait until
-    // after DrainAndShutdown returns).
-    //
-    // Returns true if the work item was successfully added to the queue, false otherwise
-    // (which typically means that the thread pool has already been shut down).
-    bool offer(WorkFunction func) {
-        return _work_queue.blocking_put(func);
-    }
-
-    // Shuts the thread pool down, causing the work queue to cease accepting offered work
-    // and the worker threads to terminate once they have processed their current work item.
-    // Returns once the shutdown flag has been set, does not wait for the threads to
-    // terminate.
-    void shutdown() {
-        {
-            boost::lock_guard<boost::mutex> l(_lock);
-            _shutdown = true;
-        }
-        _work_queue.shutdown();
-    }
-
-    // Blocks until all threads are finished. Shutdown does not need to have been called,
-    // since it may be called on a separate thread.
-    void join() {
-        _threads.join_all();
-    }
-
-    uint32_t get_queue_size() const {
-        return _work_queue.get_size();
-    }
-
-    // Blocks until the work queue is empty, and then calls Shutdown to stop the worker
-    // threads and Join to wait until they are finished.
-    // Any work Offer()'ed during drain_and_shutdown may or may not be processed.
-    void drain_and_shutdown() {
-        {
-            boost::unique_lock<boost::mutex> l(_lock);
-
-            while (_work_queue.get_size() != 0) {
-                _empty_cv.wait(l);
-            }
-        }
-        shutdown();
-        join();
-    }
-
-private:
-    // Driver method for each thread in the pool. Continues to read work from the queue
-    // until the pool is shutdown.
-    void work_thread(int thread_id) {
-        while (!is_shutdown()) {
-            WorkFunction work_function;
-
-            if (_work_queue.blocking_get(&work_function)) {
-                work_function();
-            }
-
-            if (_work_queue.get_size() == 0) {
-                _empty_cv.notify_all();
-            }
-        }
-    }
-
-    // Returns value of _shutdown under a lock, forcing visibility to threads in the pool.
-    bool is_shutdown() {
-        boost::lock_guard<boost::mutex> l(_lock);
-        return _shutdown;
-    }
-
-    // Queue on which work items are held until a thread is available to process them in
-    // FIFO order.
-    BlockingQueue<WorkFunction> _work_queue;
-
-    // Collection of worker threads that process work from the queue.
-    boost::thread_group _threads;
-
-    // Guards _shutdown and _empty_cv
-    boost::mutex _lock;
-
-    // Set to true when threads should stop doing work and terminate.
-    bool _shutdown;
-
-    // Signalled when the queue becomes empty
-    boost::condition_variable _empty_cv;
-};
-
-}
-
-#endif
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
index 0e82465..bfaf738 100644
--- a/be/test/olap/skiplist_test.cpp
+++ b/be/test/olap/skiplist_test.cpp
@@ -28,7 +28,7 @@
 #include "util/random.h"
 #include "util/condition_variable.h"
 #include "util/mutex.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
 
 namespace doris {
 
@@ -411,7 +411,7 @@ static void run_concurrent(int run) {
     Random rnd(seed);
     const int N = 1000;
     const int kSize = 1000;
-    ThreadPool thread_pool(10, 100);
+    PriorityThreadPool thread_pool(10, 100);
     for (int i = 0; i < N; i++) {
         if ((i % 100) == 0) {
             fprintf(stderr, "Run %d of %d\n", i, N);
diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp
index 8417a13..bda51c4 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -32,6 +32,7 @@
 #include "runtime/descriptor_helper.h"
 #include "util/thrift_util.h"
 #include "olap/delta_writer.h"
+#include "olap/memtable_flush_executor.h"
 #include "olap/schema.h"
 #include "olap/storage_engine.h"
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org