You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2020/02/18 10:39:15 UTC
[incubator-doris] branch master updated: Use ThreadPool to refactor
MemTableFlushExecutor (#2931)
This is an automated email from the ASF dual-hosted git repository.
lichaoyong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1cf0fb9 Use ThreadPool to refactor MemTableFlushExecutor (#2931)
1cf0fb9 is described below
commit 1cf0fb9117a38f94c88d4ca4be9daed6f690af1a
Author: lichaoyong <li...@baidu.com>
AuthorDate: Tue Feb 18 18:39:04 2020 +0800
Use ThreadPool to refactor MemTableFlushExecutor (#2931)
1. MemTableFlushExecutor maintain a ThreadPool to receive FlushTask.
2. FlushToken is used to seperate different tasks from different tablets.
Every DeltaWriter of tablet constructs a FlushToken,
task in FlushToken are handle serially, task between FlushToken are
handle concurrently.
3. I have remove thread limit on data_dir, because of I/O is not the main
timer consumer of Flush thread. Much of time is consumed in CPU decoding
and compress.
---
be/src/exec/olap_scan_node.cpp | 1 -
be/src/olap/delta_writer.cpp | 21 ++-
be/src/olap/delta_writer.h | 4 +-
be/src/olap/memtable_flush_executor.cpp | 162 +++++----------------
be/src/olap/memtable_flush_executor.h | 131 ++++-------------
be/src/runtime/dpp_sink.cpp | 2 +-
be/src/runtime/exec_env.h | 5 +-
be/src/runtime/exec_env_init.cpp | 3 +-
be/src/runtime/fragment_mgr.cpp | 4 +-
be/src/runtime/fragment_mgr.h | 4 +-
be/src/runtime/routine_load/data_consumer_group.h | 4 +-
.../routine_load/routine_load_task_executor.h | 4 +-
be/src/runtime/runtime_state.h | 1 -
be/src/runtime/tablets_channel.h | 2 +-
be/src/service/internal_service.h | 4 +-
be/src/util/priority_thread_pool.hpp | 8 +-
be/src/util/thread_pool.hpp | 154 --------------------
be/test/olap/skiplist_test.cpp | 4 +-
be/test/runtime/load_channel_mgr_test.cpp | 1 +
19 files changed, 102 insertions(+), 417 deletions(-)
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index cc3ebd9..db769c6 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -36,7 +36,6 @@
#include "runtime/string_value.h"
#include "runtime/tuple_row.h"
#include "util/runtime_profile.h"
-#include "util/thread_pool.hpp"
#include "util/debug_util.h"
#include "util/priority_thread_pool.hpp"
#include "agent/cgroups_mgr.h"
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 1aea7d6..0a89bc4 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -50,10 +50,9 @@ DeltaWriter::~DeltaWriter() {
return;
}
- if (_flush_handler != nullptr) {
+ if (_flush_token != nullptr) {
// cancel and wait all memtables in flush queue to be finished
- _flush_handler->cancel();
- _flush_handler->wait();
+ _flush_token->cancel();
}
if (_tablet != nullptr) {
@@ -150,8 +149,7 @@ OLAPStatus DeltaWriter::init() {
_reset_mem_table();
// create flush handler
- RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_handler(
- _tablet->data_dir()->path_hash(), &_flush_handler));
+ RETURN_NOT_OK(_storage_engine->memtable_flush_executor()->create_flush_token(&_flush_token));
_is_init = true;
return OLAP_SUCCESS;
@@ -175,7 +173,7 @@ OLAPStatus DeltaWriter::write(Tuple* tuple) {
}
OLAPStatus DeltaWriter::_flush_memtable_async() {
- return _flush_handler->submit(_mem_table);
+ return _flush_token->submit(_mem_table);
}
OLAPStatus DeltaWriter::flush_memtable_and_wait() {
@@ -190,7 +188,7 @@ OLAPStatus DeltaWriter::flush_memtable_and_wait() {
// this means there should be at least one memtable in flush queue.
}
// wait all memtables in flush queue to be flushed.
- RETURN_NOT_OK(_flush_handler->wait());
+ RETURN_NOT_OK(_flush_token->wait());
return OLAP_SUCCESS;
}
@@ -218,7 +216,7 @@ OLAPStatus DeltaWriter::close() {
OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInfo>* tablet_vec) {
DCHECK(_is_init) << "delta writer is supposed be to initialized before close_wait() being called";
// return error if previous flush failed
- RETURN_NOT_OK(_flush_handler->wait());
+ RETURN_NOT_OK(_flush_token->wait());
DCHECK_EQ(_mem_tracker->consumption(), 0);
// use rowset meta manager to save meta
@@ -268,7 +266,7 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
_delta_written_success = true;
- const FlushStatistic& stat = _flush_handler->get_stats();
+ const FlushStatistic& stat = _flush_token->get_stats();
LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() << ", stats: " << stat;
return OLAP_SUCCESS;
}
@@ -278,10 +276,9 @@ OLAPStatus DeltaWriter::cancel() {
return OLAP_SUCCESS;
}
_mem_table.reset();
- if (_flush_handler != nullptr) {
+ if (_flush_token != nullptr) {
// cancel and wait all memtables in flush queue to be finished
- _flush_handler->cancel();
- _flush_handler->wait();
+ _flush_token->cancel();
}
DCHECK_EQ(_mem_tracker->consumption(), 0);
return OLAP_SUCCESS;
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index dbf1326..64828d5 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -24,7 +24,7 @@
namespace doris {
-class FlushHandler;
+class FlushToken;
class MemTable;
class MemTracker;
class Schema;
@@ -106,7 +106,7 @@ private:
bool _delta_written_success;
StorageEngine* _storage_engine;
- std::shared_ptr<FlushHandler> _flush_handler;
+ std::unique_ptr<FlushToken> _flush_token;
std::unique_ptr<MemTracker> _mem_tracker;
};
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 51d5f23..00bf34e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -24,146 +24,62 @@
#include "olap/memtable.h"
#include "runtime/exec_env.h"
#include "runtime/mem_tracker.h"
+#include "util/scoped_cleanup.h"
namespace doris {
-OLAPStatus FlushHandler::submit(std::shared_ptr<MemTable> memtable) {
- RETURN_NOT_OK(_last_flush_status.load());
- MemTableFlushContext ctx;
- ctx.memtable = std::move(memtable);
- ctx.flush_handler = this->shared_from_this();
- _counter_cond.inc();
- VLOG(5) << "submitting " << *(ctx.memtable) << " to flush queue " << _flush_queue_idx;
- RETURN_NOT_OK(_flush_executor->_push_memtable(_flush_queue_idx, ctx));
- return OLAP_SUCCESS;
-}
-
-OLAPStatus FlushHandler::wait() {
- // wait all submitted tasks to be finished or cancelled
- _counter_cond.block_wait();
- return _last_flush_status.load();
-}
-
-void FlushHandler::on_flush_finished(const FlushResult& res) {
- if (res.flush_status != OLAP_SUCCESS) {
- _last_flush_status.store(res.flush_status);
- } else {
- _stats.flush_time_ns.fetch_add(res.flush_time_ns);
- _stats.flush_count.fetch_add(1);
- }
- _counter_cond.dec();
+std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
+ os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
+ << ", flush count=" << stat.flush_count << ")";
+ return os;
}
-OLAPStatus MemTableFlushExecutor::create_flush_handler(
- size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler) {
- size_t flush_queue_idx = _get_queue_idx(path_hash);
- flush_handler->reset(new FlushHandler(flush_queue_idx, this));
+OLAPStatus FlushToken::submit(std::shared_ptr<MemTable> memtable) {
+ _flush_token->submit_func(boost::bind(boost::mem_fn(&FlushToken::_flush_memtable), this, memtable));
return OLAP_SUCCESS;
}
-void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
- int32_t data_dir_num = data_dirs.size();
- _thread_num_per_store = std::max(1, config::flush_thread_num_per_store);
- _num_threads = data_dir_num * _thread_num_per_store;
-
- // create flush queues
- for (int i = 0; i < _num_threads; ++i) {
- BlockingQueue<MemTableFlushContext>* queue = new BlockingQueue<MemTableFlushContext>(10);
- _flush_queues.push_back(queue);
- }
- // create thread pool
- _flush_pool = new ThreadPool(_num_threads, 1);
- for (int32_t i = 0; i < _num_threads; ++i) {
- _flush_pool->offer(std::bind<void>(&MemTableFlushExecutor::_flush_memtable, this, i));
- }
-
- // _path_map saves the path hash to current idx of flush queue.
- // eg.
- // there are 4 data stores, each store has 2 work thread.
- // so there are 8(= 4 * 2) queues in _flush_queues.
- // and the path hash of the 4 paths are mapped to idx 0, 2, 4, 6.
- int32_t group = 0;
- for (auto store : data_dirs) {
- _path_map[store->path_hash()] = group;
- group += _thread_num_per_store;
- }
+void FlushToken::cancel() {
+ _flush_token->shutdown();
}
-MemTableFlushExecutor::~MemTableFlushExecutor() {
- // shutdown queues
- for (auto queue : _flush_queues) {
- queue->shutdown();
- }
-
- // shutdown thread pool
- _flush_pool->shutdown();
- _flush_pool->join();
-
- // delete queue
- for (auto queue : _flush_queues) {
- delete queue;
- }
- _flush_queues.clear();
-
- delete _flush_pool;
+OLAPStatus FlushToken::wait() {
+ _flush_token->wait();
+ return _flush_status;
}
-size_t MemTableFlushExecutor::_get_queue_idx(size_t path_hash) {
- std::lock_guard<SpinLock> l(_lock);
- size_t cur_idx = _path_map[path_hash];
- size_t group = cur_idx / _thread_num_per_store;
- size_t next_idx = group * _thread_num_per_store + ((cur_idx + 1) % _thread_num_per_store);
- DCHECK(next_idx < _num_threads);
- _path_map[path_hash] = next_idx;
- return cur_idx;
-}
-
-OLAPStatus MemTableFlushExecutor::_push_memtable(int32_t queue_idx, MemTableFlushContext& ctx) {
- if (!_flush_queues[queue_idx]->blocking_put(ctx)) {
- return OLAP_ERR_OTHER_ERROR;
+void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable) {
+ MonotonicStopWatch timer;
+ timer.start();
+ _flush_status = memtable->flush();
+ SCOPED_CLEANUP({
+ memtable.reset();
+ });
+ if (_flush_status != OLAP_SUCCESS) {
+ return;
}
- return OLAP_SUCCESS;
+ _stats.flush_time_ns += timer.elapsed_time();
+ _stats.flush_count++;
+ _stats.flush_size_bytes += memtable->memory_usage();
+ LOG(INFO) << "flushed " << *(memtable) << " in " << _stats.flush_time_ns / 1000 / 1000
+ << " ms, status=" << _flush_status;
}
-void MemTableFlushExecutor::_flush_memtable(int32_t queue_idx) {
- while (true) {
- MemTableFlushContext ctx;
- if (!_flush_queues[queue_idx]->blocking_get(&ctx)) {
- // queue is empty and shutdown, end of thread
- return;
- }
-
- // if last flush of this tablet already failed, just skip
- if (ctx.flush_handler->is_cancelled()) {
- VLOG(5) << "skip flushing " << *(ctx.memtable) << " due to cancellation";
- // must release memtable before notifying
- ctx.memtable.reset();
- ctx.flush_handler->on_flush_cancelled();
- continue;
- }
-
- // flush the memtable
- VLOG(5) << "begin to flush " << *(ctx.memtable);
- FlushResult res;
- MonotonicStopWatch timer;
- timer.start();
- res.flush_status = ctx.memtable->flush();
- res.flush_time_ns = timer.elapsed_time();
- res.flush_size_bytes = ctx.memtable->memory_usage();
- VLOG(5) << "flushed " << *(ctx.memtable) << " in " << res.flush_time_ns / 1000 / 1000
- << " ms, status=" << res.flush_status;
- // must release memtable before notifying
- ctx.memtable.reset();
- // callback
- ctx.flush_handler->on_flush_finished(res);
- }
+void MemTableFlushExecutor::init(const std::vector<DataDir*>& data_dirs) {
+ int32_t data_dir_num = data_dirs.size();
+ size_t min_threads = std::max(1, config::flush_thread_num_per_store);
+ size_t max_threads = data_dir_num * min_threads;
+ ThreadPoolBuilder("MemTableFlushThreadPool")
+ .set_min_threads(min_threads)
+ .set_max_threads(max_threads)
+ .build(&_flush_pool);
}
-std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
- os << "(flush time(ms)=" << stat.flush_time_ns / 1000 / 1000
- << ", flush count=" << stat.flush_count << ")";
- return os;
+// create a flush token
+OLAPStatus MemTableFlushExecutor::create_flush_token(std::unique_ptr<FlushToken>* flush_token) {
+ flush_token->reset(new FlushToken(_flush_pool->new_token(ThreadPool::ExecutionMode::SERIAL)));
+ return OLAP_SUCCESS;
}
-} // end of namespac
+} // namespace doris
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index ef84e29..bb58e3f 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -17,19 +17,13 @@
#pragma once
-#include <atomic>
#include <cstdint>
#include <memory>
-#include <queue>
#include <vector>
-#include <unordered_map>
#include <utility>
-#include "util/blocking_queue.hpp"
-#include "util/counter_cond_variable.hpp"
-#include "util/spinlock.h"
-#include "util/thread_pool.hpp"
#include "olap/olap_define.h"
+#include "util/threadpool.h"
namespace doris {
@@ -38,132 +32,65 @@ class DeltaWriter;
class ExecEnv;
class MemTable;
-// The context for a memtable to be flushed.
-class FlushHandler;
-struct MemTableFlushContext {
- // memtable to be flushed
- std::shared_ptr<MemTable> memtable;
- // flush handler from a delta writer.
- // use shared ptr because flush_handler may be deleted before this
- // memtable being flushed. so we need to make sure the flush_handler
- // is alive until this memtable being flushed.
- std::shared_ptr<FlushHandler> flush_handler;
-};
-
-// the flush result of a single memtable flush
-struct FlushResult {
- OLAPStatus flush_status;
- int64_t flush_time_ns = 0;
- int64_t flush_size_bytes = 0;
-};
-
// the statistic of a certain flush handler.
// use atomic because it may be updated by multi threads
struct FlushStatistic {
- std::atomic<std::int64_t> flush_time_ns = {0};
- std::atomic<std::int64_t> flush_count= {0};
+ int64_t flush_time_ns = 0;
+ int64_t flush_count= 0;
+ int64_t flush_size_bytes = 0;
};
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat);
-class MemTableFlushExecutor;
-
-// flush handler is for flushing memtables in a delta writer
-// This class must be wrapped by std::shared_ptr, or you will get bad_weak_ptr exception
-// when calling submit();
-class FlushHandler : public std::enable_shared_from_this<FlushHandler> {
+// A thin wrapper of ThreadPoolToken to submit task.
+class FlushToken {
public:
- FlushHandler(int32_t flush_queue_idx, MemTableFlushExecutor* flush_executor) :
- _flush_queue_idx(flush_queue_idx),
- _last_flush_status(OLAP_SUCCESS),
- _counter_cond(0),
- _flush_executor(flush_executor),
- _is_cancelled(false) {
- }
-
- // submit a memtable to flush. return error if some previous submitted MemTable has failed
- OLAPStatus submit(std::shared_ptr<MemTable> memtable);
- // wait for all memtables submitted by itself to be finished.
+ explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
+ : _flush_status(OLAP_SUCCESS),
+ _flush_token(std::move(flush_pool_token)) {}
+
+ OLAPStatus submit(std::shared_ptr<MemTable> mem_table);
+
+ // error has happpens, so we cancel this token
+ // And remove all tasks in the queue.
+ void cancel();
+
+ // wait all tasks in token to be completed.
OLAPStatus wait();
+
// get flush operations' statistics
const FlushStatistic& get_stats() const { return _stats; }
- bool is_cancelled() {
- return _last_flush_status.load() != OLAP_SUCCESS || _is_cancelled.load();
- }
- void cancel() { _is_cancelled.store(true); }
-
- // These on_xxx() methods are callback when flush finishes or cancels, user should
- // not call them directly.
- // called when a memtable is finished by executor.
- void on_flush_finished(const FlushResult& res);
- // called when a flush memtable execution is cancelled
- void on_flush_cancelled() {
- _counter_cond.dec();
- }
-
private:
- // flush queue idx in memtable flush executor
- int32_t _flush_queue_idx;
- // the flush status of last memtable
- std::atomic<OLAPStatus> _last_flush_status;
- // used to wait/notify the memtable flush execution
- CounterCondVariable _counter_cond;
+ void _flush_memtable(std::shared_ptr<MemTable> mem_table);
+ OLAPStatus _flush_status;
+ std::unique_ptr<ThreadPoolToken> _flush_token;
FlushStatistic _stats;
- MemTableFlushExecutor* _flush_executor;
-
- // the caller of the flush handler can set this variable to notify that the
- // uppper application is already cancelled.
- std::atomic<bool> _is_cancelled;
};
// MemTableFlushExecutor is responsible for flushing memtables to disk.
-// Each data directory has a specified number of worker threads and each thread will correspond
-// to a queue. The only job of each worker thread is to take memtable from its corresponding
-// flush queue and writes the data to disk.
-//
-// NOTE: User SHOULD NOT call method of this class directly, use pattern should be:
+// It encapsulate a ThreadPool to handle all tasks.
+// Usage Example:
// ...
// std::shared_ptr<FlushHandler> flush_handler;
-// memTableFlushExecutor.create_flush_handler(path_hash, &flush_handler);
+// memTableFlushExecutor.create_flush_token(path_hash, &flush_handler);
// ...
-// flush_handler->submit(memtable)
+// flush_token->submit(memtable)
// ...
class MemTableFlushExecutor {
public:
MemTableFlushExecutor() {}
- ~MemTableFlushExecutor();
+ ~MemTableFlushExecutor() {}
// init should be called after storage engine is opened,
// because it needs path hash of each data dir.
void init(const std::vector<DataDir*>& data_dirs);
- // create a flush handler to access the flush executor
- OLAPStatus create_flush_handler(size_t path_hash, std::shared_ptr<FlushHandler>* flush_handler);
+ OLAPStatus create_flush_token(std::unique_ptr<FlushToken>* flush_token);
+
private:
- friend class FlushHandler;
-
- // given the path hash, return the next idx of flush queue.
- // eg.
- // path A is mapped to idx 0 and 1, so each time get_queue_idx(A) is called,
- // 0 and 1 will returned alternately.
- size_t _get_queue_idx(size_t path_hash);
-
- // push the memtable to specified flush queue
- OLAPStatus _push_memtable(int32_t queue_idx, MemTableFlushContext& ctx);
-
- void _flush_memtable(int32_t queue_idx);
-
- int32_t _thread_num_per_store;
- int32_t _num_threads;
- ThreadPool* _flush_pool;
- // the size of this vector should equal to _num_threads
- std::vector<BlockingQueue<MemTableFlushContext>*> _flush_queues;
- // lock to protect _path_map
- SpinLock _lock;
- // path hash -> queue idx of _flush_queues;
- std::unordered_map<size_t, size_t> _path_map;
+ std::unique_ptr<ThreadPool> _flush_pool;
};
} // end namespace
diff --git a/be/src/runtime/dpp_sink.cpp b/be/src/runtime/dpp_sink.cpp
index 43d3a0b..bea1c96 100644
--- a/be/src/runtime/dpp_sink.cpp
+++ b/be/src/runtime/dpp_sink.cpp
@@ -35,7 +35,7 @@
#include "gen_cpp/Types_types.h"
#include "util/countdown_latch.h"
#include "util/debug_util.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
#include "olap/field.h"
namespace doris {
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 125f039..076b904 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -47,7 +47,6 @@ class ResultQueueMgr;
class TMasterInfo;
class LoadChannelMgr;
class TestExecEnv;
-class ThreadPool;
class ThreadResourceMgr;
class TmpFileMgr;
class WebPageHandler;
@@ -106,7 +105,7 @@ public:
PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* thread_pool() { return _thread_pool; }
- ThreadPool* etl_thread_pool() { return _etl_thread_pool; }
+ PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
TMasterInfo* master_info() { return _master_info; }
@@ -157,7 +156,7 @@ private:
PoolMemTrackerRegistry* _pool_mem_trackers = nullptr;
ThreadResourceMgr* _thread_mgr = nullptr;
PriorityThreadPool* _thread_pool = nullptr;
- ThreadPool* _etl_thread_pool = nullptr;
+ PriorityThreadPool* _etl_thread_pool = nullptr;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
TMasterInfo* _master_info = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8b4a28c..47cc38b 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -55,7 +55,6 @@
#include "util/brpc_stub_cache.h"
#include "util/priority_thread_pool.hpp"
#include "agent/cgroups_mgr.h"
-#include "util/thread_pool.hpp"
#include "gen_cpp/BackendService.h"
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/TPaloBrokerService.h"
@@ -86,7 +85,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths) {
_thread_pool = new PriorityThreadPool(
config::doris_scanner_thread_pool_thread_num,
config::doris_scanner_thread_pool_queue_size);
- _etl_thread_pool = new ThreadPool(
+ _etl_thread_pool = new PriorityThreadPool(
config::etl_thread_pool_size,
config::etl_thread_pool_queue_size);
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a60dc47..fec4c5d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -411,7 +411,7 @@ Status FragmentMgr::exec_plan_fragment(
}
static void* fragment_executor(void* param) {
- ThreadPool::WorkFunction* func = (ThreadPool::WorkFunction*)param;
+ PriorityThreadPool::WorkFunction* func = (PriorityThreadPool::WorkFunction*)param;
(*func)();
delete func;
return nullptr;
@@ -469,7 +469,7 @@ Status FragmentMgr::exec_plan_fragment(
int ret = pthread_create(&id,
nullptr,
fragment_executor,
- new ThreadPool::WorkFunction(
+ new PriorityThreadPool::WorkFunction(
std::bind<void>(&FragmentMgr::exec_actual, this, exec_state, cb)));
if (ret != 0) {
std::string err_msg("Could not create thread.");
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 5eb04db..74b25c4 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -29,7 +29,7 @@
#include "gen_cpp/DorisExternalService_types.h"
#include "gen_cpp/Types_types.h"
#include "gen_cpp/internal_service.pb.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
#include "util/hash_util.hpp"
#include "http/rest_monitor_iface.h"
@@ -90,7 +90,7 @@ private:
bool _stop;
std::thread _cancel_thread;
// every job is a pool
- ThreadPool _thread_pool;
+ PriorityThreadPool _thread_pool;
};
diff --git a/be/src/runtime/routine_load/data_consumer_group.h b/be/src/runtime/routine_load/data_consumer_group.h
index 52b756b..9083835 100644
--- a/be/src/runtime/routine_load/data_consumer_group.h
+++ b/be/src/runtime/routine_load/data_consumer_group.h
@@ -19,7 +19,7 @@
#include "runtime/routine_load/data_consumer.h"
#include "util/blocking_queue.hpp"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
namespace doris {
@@ -59,7 +59,7 @@ protected:
UniqueId _grp_id;
std::vector<std::shared_ptr<DataConsumer>> _consumers;
// thread pool to run each consumer in multi thread
- ThreadPool _thread_pool;
+ PriorityThreadPool _thread_pool;
// mutex to protect counter.
// the counter is init as the number of consumers.
// once a consumer is done, decrease the counter.
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.h b/be/src/runtime/routine_load/routine_load_task_executor.h
index 2bc88f4..3700e80 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.h
+++ b/be/src/runtime/routine_load/routine_load_task_executor.h
@@ -22,7 +22,7 @@
#include <mutex>
#include "runtime/routine_load/data_consumer_pool.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
#include "util/uid_util.h"
#include "gen_cpp/internal_service.pb.h"
@@ -74,7 +74,7 @@ private:
private:
ExecEnv* _exec_env;
- ThreadPool _thread_pool;
+ PriorityThreadPool _thread_pool;
DataConsumerPool _data_consumer_pool;
std::mutex _lock;
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 9589035..688e7fb 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -51,7 +51,6 @@ class DateTimeValue;
class MemTracker;
class DataStreamRecvr;
class ResultBufferMgr;
-class ThreadPool;
class DiskIoMgrs;
class TmpFileMgr;
class BufferedBlockMgr;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 25c6cb1..cb865cf 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -23,7 +23,7 @@
#include "runtime/descriptors.h"
#include "runtime/mem_tracker.h"
#include "util/bitmap.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
#include "util/uid_util.h"
#include "gen_cpp/Types_types.h"
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 957bb79..e27c2c3 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -20,7 +20,7 @@
#include "common/status.h"
#include "gen_cpp/internal_service.pb.h"
#include "gen_cpp/palo_internal_service.pb.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
namespace brpc {
class Controller;
@@ -90,7 +90,7 @@ private:
Status _exec_plan_fragment(brpc::Controller* cntl);
private:
ExecEnv* _exec_env;
- ThreadPool _tablet_worker_pool;
+ PriorityThreadPool _tablet_worker_pool;
};
}
diff --git a/be/src/util/priority_thread_pool.hpp b/be/src/util/priority_thread_pool.hpp
index faa00ba..68f50d7 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -56,7 +56,6 @@ public:
// capacity available.
// -- work_function: the function to run every time an item is consumed from the queue
PriorityThreadPool(uint32_t num_threads, uint32_t queue_size) :
- _thread_num(num_threads),
_work_queue(queue_size),
_shutdown(false) {
for (int i = 0; i < num_threads; ++i) {
@@ -88,6 +87,11 @@ public:
return _work_queue.blocking_put(task);
}
+ bool offer(WorkFunction func) {
+ PriorityThreadPool::Task task = {0, func};
+ return _work_queue.blocking_put(task);
+ }
+
// Shuts the thread pool down, causing the work queue to cease accepting offered work
// and the worker threads to terminate once they have processed their current work item.
// Returns once the shutdown flag has been set, does not wait for the threads to
@@ -145,8 +149,6 @@ private:
return _shutdown;
}
- uint32_t _thread_num;
-
// Queue on which work items are held until a thread is available to process them in
// FIFO order.
BlockingPriorityQueue<Task> _work_queue;
diff --git a/be/src/util/thread_pool.hpp b/be/src/util/thread_pool.hpp
deleted file mode 100644
index 9aa1feb..0000000
--- a/be/src/util/thread_pool.hpp
+++ /dev/null
@@ -1,154 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP
-#define DORIS_BE_SRC_COMMON_UTIL_THREAD_POOL_HPP
-
-#include "util/blocking_queue.hpp"
-
-#include <boost/thread.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/bind/mem_fn.hpp>
-
-namespace doris {
-
-// Simple threadpool which processes items (of type T) in parallel which were placed on a
-// blocking queue by Offer(). Each item is processed by a single user-supplied method.
-class ThreadPool {
-public:
- // Signature of a work-processing function. Takes the integer id of the thread which is
- // calling it (ids run from 0 to num_threads - 1) and a reference to the item to
- // process.
- typedef boost::function<void ()> WorkFunction;
-
- // Creates a new thread pool and start num_threads threads.
- // -- num_threads: how many threads are part of this pool
- // -- queue_size: the maximum size of the queue on which work items are offered. If the
- // queue exceeds this size, subsequent calls to Offer will block until there is
- // capacity available.
- // -- work_function: the function to run every time an item is consumed from the queue
- ThreadPool(uint32_t num_threads, uint32_t queue_size) :
- _work_queue(queue_size),
- _shutdown(false) {
- for (int i = 0; i < num_threads; ++i) {
- _threads.create_thread(
- boost::bind<void>(boost::mem_fn(&ThreadPool::work_thread), this, i));
- }
- }
-
- // Destructor ensures that all threads are terminated before this object is freed
- // otherwise they may continue to run and reference member variables
- ~ThreadPool() {
- shutdown();
- join();
- }
-
- // Blocking operation that puts a work item on the queue. If the queue is full, blocks
- // until there is capacity available.
- //
- // 'work' is copied into the work queue, but may be referenced at any time in the
- // future. Therefore the caller needs to ensure that any data referenced by work (if T
- // is, e.g., a pointer type) remains valid until work has been processed, and it's up to
- // the caller to provide their own signalling mechanism to detect this (or to wait until
- // after DrainAndShutdown returns).
- //
- // Returns true if the work item was successfully added to the queue, false otherwise
- // (which typically means that the thread pool has already been shut down).
- bool offer(WorkFunction func) {
- return _work_queue.blocking_put(func);
- }
-
- // Shuts the thread pool down, causing the work queue to cease accepting offered work
- // and the worker threads to terminate once they have processed their current work item.
- // Returns once the shutdown flag has been set, does not wait for the threads to
- // terminate.
- void shutdown() {
- {
- boost::lock_guard<boost::mutex> l(_lock);
- _shutdown = true;
- }
- _work_queue.shutdown();
- }
-
- // Blocks until all threads are finished. Shutdown does not need to have been called,
- // since it may be called on a separate thread.
- void join() {
- _threads.join_all();
- }
-
- uint32_t get_queue_size() const {
- return _work_queue.get_size();
- }
-
- // Blocks until the work queue is empty, and then calls Shutdown to stop the worker
- // threads and Join to wait until they are finished.
- // Any work Offer()'ed during drain_and_shutdown may or may not be processed.
- void drain_and_shutdown() {
- {
- boost::unique_lock<boost::mutex> l(_lock);
-
- while (_work_queue.get_size() != 0) {
- _empty_cv.wait(l);
- }
- }
- shutdown();
- join();
- }
-
-private:
- // Driver method for each thread in the pool. Continues to read work from the queue
- // until the pool is shutdown.
- void work_thread(int thread_id) {
- while (!is_shutdown()) {
- WorkFunction work_function;
-
- if (_work_queue.blocking_get(&work_function)) {
- work_function();
- }
-
- if (_work_queue.get_size() == 0) {
- _empty_cv.notify_all();
- }
- }
- }
-
- // Returns value of _shutdown under a lock, forcing visibility to threads in the pool.
- bool is_shutdown() {
- boost::lock_guard<boost::mutex> l(_lock);
- return _shutdown;
- }
-
- // Queue on which work items are held until a thread is available to process them in
- // FIFO order.
- BlockingQueue<WorkFunction> _work_queue;
-
- // Collection of worker threads that process work from the queue.
- boost::thread_group _threads;
-
- // Guards _shutdown and _empty_cv
- boost::mutex _lock;
-
- // Set to true when threads should stop doing work and terminate.
- bool _shutdown;
-
- // Signalled when the queue becomes empty
- boost::condition_variable _empty_cv;
-};
-
-}
-
-#endif
diff --git a/be/test/olap/skiplist_test.cpp b/be/test/olap/skiplist_test.cpp
index 0e82465..bfaf738 100644
--- a/be/test/olap/skiplist_test.cpp
+++ b/be/test/olap/skiplist_test.cpp
@@ -28,7 +28,7 @@
#include "util/random.h"
#include "util/condition_variable.h"
#include "util/mutex.h"
-#include "util/thread_pool.hpp"
+#include "util/priority_thread_pool.hpp"
namespace doris {
@@ -411,7 +411,7 @@ static void run_concurrent(int run) {
Random rnd(seed);
const int N = 1000;
const int kSize = 1000;
- ThreadPool thread_pool(10, 100);
+ PriorityThreadPool thread_pool(10, 100);
for (int i = 0; i < N; i++) {
if ((i % 100) == 0) {
fprintf(stderr, "Run %d of %d\n", i, N);
diff --git a/be/test/runtime/load_channel_mgr_test.cpp b/be/test/runtime/load_channel_mgr_test.cpp
index 8417a13..bda51c4 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -32,6 +32,7 @@
#include "runtime/descriptor_helper.h"
#include "util/thrift_util.h"
#include "olap/delta_writer.h"
+#include "olap/memtable_flush_executor.h"
#include "olap/schema.h"
#include "olap/storage_engine.h"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org