You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/04/23 01:48:50 UTC
[incubator-doris] branch master updated: [Optimize] Optimize the
assign logic of compaction tasks to avoid starvation (#5683)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 12b2447 [Optimize] Optimize the assign logic of compaction tasks to avoid starvation (#5683)
12b2447 is described below
commit 12b24477243bf92543dd2b5f818d20b994328016
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 23 09:48:37 2021 +0800
[Optimize] Optimize the assign logic of compaction tasks to avoid starvation (#5683)
1. Reserve a slot to ensure that the cumulative compaction can be executed.
2. Ensure that the compaction score metric can be updated.
---
be/src/common/config.h | 4 +++
be/src/olap/olap_server.cpp | 77 ++++++++++++++++++++++++++++++++++--------
be/src/olap/storage_engine.h | 7 ++--
be/src/olap/tablet_manager.cpp | 9 +++--
be/src/olap/tablet_manager.h | 2 +-
5 files changed, 75 insertions(+), 24 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0c9fd35..3c04c3a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -296,7 +296,11 @@ CONF_mInt64(total_permits_for_compaction_score, "10000");
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")
// Compaction task number per disk.
+// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
CONF_mInt32(compaction_task_num_per_disk, "2");
+CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
+ return config >= 2;
+});
// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ed72dc1..488ac63 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -320,7 +320,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
#endif
LOG(INFO) << "try to start compaction producer process!";
- std::vector<TTabletId> tablet_submitted;
+ std::map<TTabletId, CompactionType> tablet_submitted;
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
@@ -330,18 +330,37 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int round = 0;
CompactionType compaction_type;
+ // Used to record the time when the score metric was last updated.
+ // The update of the score metric is accompanied by the logic of selecting the tablet.
+ // If there is no slot available, the logic of selecting the tablet will be terminated,
+ // which causes the score metric update to be terminated.
+ // In order to avoid this situation, we need to update the score regularly.
+ int64_t last_cumulative_score_update_time = 0;
+ int64_t last_base_score_update_time = 0;
+ static const int64_t check_score_interval_ms = 5000; // 5 secs
+
int64_t interval = config::generate_compaction_tasks_min_interval_ms;
do {
if (!config::disable_auto_compaction) {
+ bool check_score = false;
+ int64_t cur_time = UnixMillis();
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
round++;
+ if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
+ check_score = true;
+ last_cumulative_score_update_time = cur_time;
+ }
} else {
compaction_type = CompactionType::BASE_COMPACTION;
round = 0;
+ if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
+ check_score = true;
+ last_base_score_update_time = cur_time;
+ }
}
std::vector<TabletSharedPtr> tablets_compaction =
- _compaction_tasks_generator(compaction_type, data_dirs);
+ _compaction_tasks_generator(compaction_type, data_dirs, check_score);
if (tablets_compaction.size() == 0) {
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 0;
@@ -361,7 +380,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
if (permits > 0 && _permit_limiter.request(permits)) {
// Push to _tablet_submitted_compaction before submitting task
- _push_tablet_into_submitted_compaction(tablet);
+ _push_tablet_into_submitted_compaction(tablet, compaction_type);
auto st =_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
@@ -389,15 +408,43 @@ void StorageEngine::_compaction_tasks_producer_callback() {
}
std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
- CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
+ CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
+ bool need_pick_tablet = true;
std::random_shuffle(data_dirs.begin(), data_dirs.end());
for (auto data_dir : data_dirs) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
+ // We need to reserve at least one Slot for cumulative compaction.
+ // So when there is only one Slot, we have to judge whether there is a cumulative compaction
+ // in the currently submitted task.
+ // If so, the last Slot can be assigned to Base compaction,
+ // otherwise, this Slot needs to be reserved for cumulative compaction.
if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) {
- continue;
+ // Return if no available slot
+ need_pick_tablet = false;
+ if (!check_score) {
+ continue;
+ }
+ } else if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk - 1) {
+ // Only one slot left, check if it can be assigned to base compaction task.
+ if (compaction_type == CompactionType::BASE_COMPACTION) {
+ bool has_cumu_submitted = false;
+ for (const auto& submitted : _tablet_submitted_compaction[data_dir]) {
+ if (submitted.second == CompactionType::CUMULATIVE_COMPACTION) {
+ has_cumu_submitted = true;
+ break;
+ }
+ }
+ if (!has_cumu_submitted) {
+ need_pick_tablet = false;
+ if (!check_score) {
+ continue;
+ }
+ }
+ }
}
+
if (!data_dir->reach_capacity_limit(0)) {
uint32_t disk_max_score = 0;
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
@@ -408,6 +455,7 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
}
}
}
+
if (!tablets_compaction.empty()) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(max_compaction_score);
@@ -415,24 +463,23 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(max_compaction_score);
}
}
+ if (!need_pick_tablet) {
+ // This is just for updating the compaction score metric, no need to return tablet.
+ tablets_compaction.clear();
+ }
return tablets_compaction;
}
-void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) {
+void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
- _tablet_submitted_compaction[tablet->data_dir()].emplace_back(
- tablet->tablet_id());
+ _tablet_submitted_compaction[tablet->data_dir()].emplace(
+ tablet->tablet_id(), compaction_type);
}
void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
- std::vector<TTabletId>::iterator it_tablet =
- find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
- _tablet_submitted_compaction[tablet->data_dir()].end(),
- tablet->tablet_id());
- if (it_tablet !=
- _tablet_submitted_compaction[tablet->data_dir()].end()) {
- _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
+ int removed = _tablet_submitted_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+ if (removed == 1) {
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index bc06db2..ef76624 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -241,8 +241,9 @@ private:
void _compaction_tasks_producer_callback();
vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType compaction_type,
- std::vector<DataDir*> data_dirs);
- void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
+ std::vector<DataDir*>& data_dirs,
+ bool check_score);
+ void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type);
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);
Status _init_stream_load_recorder(const std::string& stream_load_record_path);
@@ -347,7 +348,7 @@ private:
CompactionPermitLimiter _permit_limiter;
std::mutex _tablet_submitted_compaction_mutex;
- std::map<DataDir*, vector<TTabletId>> _tablet_submitted_compaction;
+ std::map<DataDir*, std::map<TTabletId, CompactionType>> _tablet_submitted_compaction;
AtomicInt32 _wakeup_producer_flag;
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index b39b375..cedeabb 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -697,7 +697,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
- std::vector<TTabletId>& tablet_submitted_compaction, uint32_t* score) {
+ const std::map<TTabletId, CompactionType>& tablet_submitted_compaction, uint32_t* score) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
@@ -709,12 +709,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
ReadLock rlock(tablets_shard.lock.get());
for (const auto& tablet_map : tablets_shard.tablet_map) {
for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
- std::vector<TTabletId>::iterator it_tablet =
- find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(),
- tablet_ptr->tablet_id());
- if (it_tablet != tablet_submitted_compaction.end()) {
+ auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id());
+ if (search != tablet_submitted_compaction.end()) {
continue;
}
+
AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task();
if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED &&
cur_alter_task->alter_state() != ALTER_FAILED) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 3427b93..c021d75 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -71,7 +71,7 @@ public:
TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
DataDir* data_dir,
- vector<TTabletId>& tablet_submitted_compaction,
+ const std::map<TTabletId, CompactionType>& tablet_submitted_compaction,
uint32_t* score);
TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org