You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/04/23 01:48:50 UTC

[incubator-doris] branch master updated: [Optimize] Optimize the assign logic of compaction tasks to avoid starvation (#5683)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 12b2447  [Optimize] Optimize the assign logic of compaction tasks to avoid starvation (#5683)
12b2447 is described below

commit 12b24477243bf92543dd2b5f818d20b994328016
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Fri Apr 23 09:48:37 2021 +0800

    [Optimize] Optimize the assign logic of compaction tasks to avoid starvation (#5683)
    
    1. Reserve a slot to ensure that the cumulative compaction can be executed.
    2. Ensure that the compaction score metric can be updated.
---
 be/src/common/config.h         |  4 +++
 be/src/olap/olap_server.cpp    | 77 ++++++++++++++++++++++++++++++++++--------
 be/src/olap/storage_engine.h   |  7 ++--
 be/src/olap/tablet_manager.cpp |  9 +++--
 be/src/olap/tablet_manager.h   |  2 +-
 5 files changed, 75 insertions(+), 24 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0c9fd35..3c04c3a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -296,7 +296,11 @@ CONF_mInt64(total_permits_for_compaction_score, "10000");
 CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")
 
 // Compaction task number per disk.
+// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
 CONF_mInt32(compaction_task_num_per_disk, "2");
+CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
+  return config >= 2;
+});
 
 // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
 CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ed72dc1..488ac63 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -320,7 +320,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
 #endif
     LOG(INFO) << "try to start compaction producer process!";
 
-    std::vector<TTabletId> tablet_submitted;
+    std::map<TTabletId, CompactionType> tablet_submitted;
     std::vector<DataDir*> data_dirs;
     for (auto& tmp_store : _store_map) {
         data_dirs.push_back(tmp_store.second);
@@ -330,18 +330,37 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int round = 0;
     CompactionType compaction_type;
 
+    // Used to record the time when the score metric was last updated.
+    // The update of the score metric is accompanied by the logic of selecting the tablet.
+    // If there is no slot available, the logic of selecting the tablet will be terminated,
+    // which causes the score metric update to be terminated.
+    // In order to avoid this situation, we need to update the score regularly.
+    int64_t last_cumulative_score_update_time = 0;
+    int64_t last_base_score_update_time = 0;
+    static const int64_t check_score_interval_ms = 5000; // 5 secs
+
     int64_t interval = config::generate_compaction_tasks_min_interval_ms;
     do {
         if (!config::disable_auto_compaction) {
+            bool check_score = false;
+            int64_t cur_time = UnixMillis();
             if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
                 compaction_type = CompactionType::CUMULATIVE_COMPACTION;
                 round++;
+                if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
+                    check_score = true;
+                    last_cumulative_score_update_time = cur_time;
+                }
             } else {
                 compaction_type = CompactionType::BASE_COMPACTION;
                 round = 0;
+                if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
+                    check_score = true;
+                    last_base_score_update_time = cur_time;
+                }
             }
             std::vector<TabletSharedPtr> tablets_compaction =
-                    _compaction_tasks_generator(compaction_type, data_dirs);
+                    _compaction_tasks_generator(compaction_type, data_dirs, check_score);
             if (tablets_compaction.size() == 0) {
                 std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
                 _wakeup_producer_flag = 0;
@@ -361,7 +380,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                 int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
                 if (permits > 0 && _permit_limiter.request(permits)) {
                     // Push to _tablet_submitted_compaction before submitting task
-                    _push_tablet_into_submitted_compaction(tablet);
+                    _push_tablet_into_submitted_compaction(tablet, compaction_type);
                     auto st =_compaction_thread_pool->submit_func([=]() {
                       CgroupsMgr::apply_system_cgroup();
                       tablet->execute_compaction(compaction_type);
@@ -389,15 +408,43 @@ void StorageEngine::_compaction_tasks_producer_callback() {
 }
 
 std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
-        CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
+        CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
     std::vector<TabletSharedPtr> tablets_compaction;
     uint32_t max_compaction_score = 0;
+    bool need_pick_tablet = true;
     std::random_shuffle(data_dirs.begin(), data_dirs.end());
     for (auto data_dir : data_dirs) {
         std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
+        // We need to reserve at least one Slot for cumulative compaction.
+        // So when there is only one Slot, we have to judge whether there is a cumulative compaction
+        // in the currently submitted task.
+        // If so, the last Slot can be assigned to Base compaction,
+        // otherwise, this Slot needs to be reserved for cumulative compaction.
         if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) {
-            continue;
+            // Return if no available slot
+            need_pick_tablet = false;
+            if (!check_score) {
+                continue;
+            }
+        } else if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk - 1) {
+            // Only one slot left, check if it can be assigned to base compaction task.
+            if (compaction_type == CompactionType::BASE_COMPACTION) {
+                bool has_cumu_submitted = false;
+                for (const auto& submitted : _tablet_submitted_compaction[data_dir]) {
+                    if (submitted.second == CompactionType::CUMULATIVE_COMPACTION) {
+                        has_cumu_submitted = true;
+                        break;
+                    }
+                }
+                if (!has_cumu_submitted) {
+                    need_pick_tablet = false;
+                    if (!check_score) {
+                        continue;
+                    }
+                }
+            }
         }
+
         if (!data_dir->reach_capacity_limit(0)) {
             uint32_t disk_max_score = 0;
             TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
@@ -408,6 +455,7 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
             }
         }
     }
+
     if (!tablets_compaction.empty()) {
         if (compaction_type == CompactionType::BASE_COMPACTION) {
             DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(max_compaction_score);
@@ -415,24 +463,23 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
             DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(max_compaction_score);
         }
     }
+    if (!need_pick_tablet) {
+        // This is just for updating the compaction score metric, no need to return tablet.
+        tablets_compaction.clear();
+    }
     return tablets_compaction;
 }
 
-void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) {
+void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type) {
     std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
-    _tablet_submitted_compaction[tablet->data_dir()].emplace_back(
-            tablet->tablet_id());
+    _tablet_submitted_compaction[tablet->data_dir()].emplace(
+            tablet->tablet_id(), compaction_type);
 }
 
 void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) {
     std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
-    std::vector<TTabletId>::iterator it_tablet =
-            find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
-                 _tablet_submitted_compaction[tablet->data_dir()].end(),
-                 tablet->tablet_id());
-    if (it_tablet !=
-        _tablet_submitted_compaction[tablet->data_dir()].end()) {
-        _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
+    int removed = _tablet_submitted_compaction[tablet->data_dir()].erase(tablet->tablet_id());
+    if (removed == 1) {
         std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
         _wakeup_producer_flag = 1;
         _compaction_producer_sleep_cv.notify_one();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index bc06db2..ef76624 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -241,8 +241,9 @@ private:
 
     void _compaction_tasks_producer_callback();
     vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType compaction_type,
-                                                        std::vector<DataDir*> data_dirs);
-    void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
+                                                        std::vector<DataDir*>& data_dirs,
+                                                        bool check_score);
+    void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type);
     void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);
 
     Status _init_stream_load_recorder(const std::string& stream_load_record_path);
@@ -347,7 +348,7 @@ private:
     CompactionPermitLimiter _permit_limiter;
 
     std::mutex _tablet_submitted_compaction_mutex;
-    std::map<DataDir*, vector<TTabletId>> _tablet_submitted_compaction;
+    std::map<DataDir*, std::map<TTabletId, CompactionType>> _tablet_submitted_compaction;
 
     AtomicInt32 _wakeup_producer_flag;
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index b39b375..cedeabb 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -697,7 +697,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {
 
 TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
         CompactionType compaction_type, DataDir* data_dir,
-        std::vector<TTabletId>& tablet_submitted_compaction, uint32_t* score) {
+        const std::map<TTabletId, CompactionType>& tablet_submitted_compaction, uint32_t* score) {
     int64_t now_ms = UnixMillis();
     const string& compaction_type_str =
             compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
@@ -709,12 +709,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
         ReadLock rlock(tablets_shard.lock.get());
         for (const auto& tablet_map : tablets_shard.tablet_map) {
             for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
-                std::vector<TTabletId>::iterator it_tablet =
-                        find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(),
-                             tablet_ptr->tablet_id());
-                if (it_tablet != tablet_submitted_compaction.end()) {
+                auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id());
+                if (search != tablet_submitted_compaction.end()) {
                     continue;
                 }
+
                 AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task();
                 if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED &&
                     cur_alter_task->alter_state() != ALTER_FAILED) {
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 3427b93..c021d75 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -71,7 +71,7 @@ public:
 
     TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
                                                    DataDir* data_dir,
-                                                   vector<TTabletId>& tablet_submitted_compaction,
+                                                   const std::map<TTabletId, CompactionType>& tablet_submitted_compaction,
                                                    uint32_t* score);
 
     TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org