You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/17 13:15:10 UTC

[doris] branch dev-1.0.1 updated: [opt](compaction) optimize compaction in concurrent load (#10153)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.0.1 by this push:
     new 2e04e4cc63 [opt](compaction) optimize compaction in concurrent load (#10153)
2e04e4cc63 is described below

commit 2e04e4cc63399d8d64fc5e16955369ef8d35cb44
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Fri Jun 17 17:49:45 2022 +0800

    [opt](compaction) optimize compaction in concurrent load (#10153)
    
    add some logic to opt compaction:
    1.seperate base&cumu compaction in case base compaction runs too long and
    affect cumu compaction
    2.fix level size in cu compaction so that file size below 64M have a right level
    size, when choose rowsets to do compaction, the policy will ignore big rowset,
    this will reduce about 25% cpu in high frequency concurrent load
    3.remove skip window restriction so rowset can do compaction right after
    generated, cause we'll not delete rowset after compaction. This will highly
    reduce compaction score in concurrent log.
    4.remove version consistence check in can_do_compaction, we'll choose a
    consecutive rowset to do compaction, so this logic is useless
    
    after add logic above, compaction score and cpu cost will have a substantial
    optimize in concurrent load.
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/common/config.h                             |   7 +-
 be/src/olap/cumulative_compaction.cpp              |   6 +-
 be/src/olap/cumulative_compaction_policy.cpp       |  12 +--
 be/src/olap/cumulative_compaction_policy.h         |   2 -
 be/src/olap/olap_server.cpp                        | 120 ++++++++++++++-------
 be/src/olap/storage_engine.cpp                     |   7 +-
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/olap/tablet.cpp                             |  19 +---
 be/src/olap/tablet.h                               |   2 +-
 be/test/olap/cumulative_compaction_policy_test.cpp |  24 ++---
 docs/en/administrator-guide/config/be_config.md    |  14 ++-
 docs/zh-CN/administrator-guide/config/be_config.md |  14 ++-
 12 files changed, 134 insertions(+), 98 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 29fed299bc..dd769e55da 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -277,17 +277,14 @@ CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
 // cumulative compaction policy: min and max delta file's number
 CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
 CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
-// cumulative compaction skips recently published deltas in order to prevent
-// compacting a version that might be queried (in case the query planning phase took some time).
-// the following config set the window size
-CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
 
 // if compaction of a tablet failed, this tablet should not be chosen to
 // compaction until this interval passes.
 CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
 
 // This config can be set to limit thread number in compaction thread pool.
-CONF_mInt32(max_compaction_threads, "10");
+CONF_mInt32(max_base_compaction_threads, "4");
+CONF_mInt32(max_cumu_compaction_threads, "10");
 
 // This config can be set to limit thread number in convert rowset thread pool.
 CONF_mInt32(convert_rowset_thread_num, "0");
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 67efa07363..864a7ec0a6 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -96,15 +96,13 @@ OLAPStatus CumulativeCompaction::execute_compact_impl() {
 OLAPStatus CumulativeCompaction::pick_rowsets_to_compact() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(
-            config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     if (candidate_rowsets.empty()) {
         return OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION;
     }
 
-    // candidate_rowsets may not be continuous. Because some rowset may not be selected
-    // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds).
+    // candidate_rowsets may not be continuous
     // So we need to choose the longest continuous path from it.
     std::vector<Version> missing_versions;
     RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index 2a18a1f1ac..67e4d4aa08 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -32,10 +32,11 @@ SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
           _size_based_promotion_ratio(size_based_promotion_ratio),
           _size_based_promotion_min_size(size_based_promotion_min_size),
           _size_based_compaction_lower_bound_size(size_based_compaction_lower_bound_size) {
-    // init _levels by divide 2 between size_based_promotion_size and size_based_compaction_lower_bound_size
+    // init _levels by divide 2 between size_based_compaction_lower_bound_size and 1K
+    // cu compaction handle file size less then size_based_compaction_lower_bound_size
     int64_t i_size = size_based_promotion_size / 2;
 
-    while (i_size >= size_based_compaction_lower_bound_size) {
+    while (i_size >= 1024) {
         _levels.push_back(i_size);
         i_size /= 2;
     }
@@ -458,16 +459,11 @@ void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
 }
 
 void CumulativeCompactionPolicy::pick_candidate_rowsets(
-        int64_t skip_window_sec,
         const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
         int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
-    int64_t now = UnixSeconds();
     for (auto& it : rs_version_map) {
         // find all rowset version greater than cumulative_point and skip the create time in skip_window_sec
-        if (it.first.first >= cumulative_point
-            && ((it.second->creation_time() + skip_window_sec < now)
-            // this case means a rowset has been compacted before which is not a new published rowset, so it should participate compaction
-            || (it.first.first != it.first.second))) {
+        if (it.first.first >= cumulative_point) {
             candidate_rowsets->push_back(it.second);
         }
     }
diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h
index 2ff414566e..e9963670ce 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -70,12 +70,10 @@ public:
 
     /// This function implements the policy which represents how to pick the candidate rowsets for compaction.
     /// This base class gives a unified implementation. Its derived classes also can override this function each other.
-    /// param skip_window_sec, it means skipping the rowsets which use create time plus skip_window_sec is greater than now.
     /// param rs_version_map, mapping from version to rowset
     /// param cumulative_point,  current cumulative point of tablet
     /// return candidate_rowsets, the container of candidate rowsets
     virtual void pick_candidate_rowsets(
-            int64_t skip_window_sec,
             const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
             int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets);
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index c4b157de28..2829f21bd2 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -67,11 +67,16 @@ Status StorageEngine::start_bg_threads() {
         data_dirs.push_back(tmp_store.second);
     }
 
-    int32_t max_thread_num = config::max_compaction_threads;
-    ThreadPoolBuilder("CompactionTaskThreadPool")
+    int32_t max_thread_num = config::max_base_compaction_threads;
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
             .set_min_threads(max_thread_num)
             .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
+            .build(&_base_compaction_thread_pool);
+    max_thread_num = config::max_cumu_compaction_threads;
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(max_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_cumu_compaction_thread_pool);
 
     int32_t convert_rowset_thread_num = config::convert_rowset_thread_num;
     if (convert_rowset_thread_num > 0) {
@@ -88,11 +93,14 @@ Status StorageEngine::start_bg_threads() {
         LOG(INFO) << "alpha rowset scan thread started";
     }
 
-    ThreadPoolBuilder("CompactionTaskThreadPool")
-            .set_min_threads(max_thread_num)
-            .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
-
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+            .set_min_threads(config::max_base_compaction_threads)
+            .set_max_threads(config::max_base_compaction_threads)
+            .build(&_base_compaction_thread_pool);
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(config::max_cumu_compaction_threads)
+            .set_max_threads(config::max_cumu_compaction_threads)
+            .build(&_cumu_compaction_thread_pool);
     ThreadPoolBuilder("SmallCompactionTaskThreadPool")
             .set_min_threads(config::quick_compaction_max_threads)
             .set_max_threads(config::quick_compaction_max_threads)
@@ -371,6 +379,46 @@ void StorageEngine::_alpha_rowset_scan_thread_callback() {
     } while (!_stop_background_threads_latch.wait_for(MonoDelta::FromSeconds(scan_interval_sec)));
 }
 
+void StorageEngine::_adjust_compaction_thread_num() {
+    if (_base_compaction_thread_pool->max_threads() != config::max_base_compaction_threads) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status =
+                _base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != config::max_base_compaction_threads) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status =
+                _base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+
+    if (_cumu_compaction_thread_pool->max_threads() != config::max_cumu_compaction_threads) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status =
+                _cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != config::max_cumu_compaction_threads) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status =
+                _cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+}
+
 void StorageEngine::_compaction_tasks_producer_callback() {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
@@ -401,29 +449,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int64_t interval = config::generate_compaction_tasks_min_interval_ms;
     do {
         if (!config::disable_auto_compaction) {
-            VLOG_CRITICAL << "compaction thread pool. num_threads: " << _compaction_thread_pool->num_threads()
-                      << ", num_threads_pending_start: " << _compaction_thread_pool->num_threads_pending_start()
-                      << ", num_active_threads: " << _compaction_thread_pool->num_active_threads()
-                      << ", max_threads: " << _compaction_thread_pool->max_threads()
-                      << ", min_threads: " << _compaction_thread_pool->min_threads()
-                      << ", num_total_queued_tasks: " << _compaction_thread_pool->get_queue_size();
-
-            if(_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
-                int old_max_threads = _compaction_thread_pool->max_threads();
-                Status status = _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool max_threads from "
-                              << old_max_threads << " to " << config::max_compaction_threads;
-                }
-            }
-            if(_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
-                int old_min_threads = _compaction_thread_pool->min_threads();
-                Status status = _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool min_threads from "
-                              << old_min_threads << " to " << config::max_compaction_threads;
-                }
-            }
+            _adjust_compaction_thread_num();
 
             bool check_score = false;
             int64_t cur_time = UnixMillis();
@@ -442,6 +468,20 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                                       : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, check_score);
             if (tablets_compaction.size() == 0) {
@@ -619,13 +659,17 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, Compaction
     int64_t permits = 0;
     Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
     if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
-        auto st = _compaction_thread_pool->submit_func([=]() {
-          CgroupsMgr::apply_system_cgroup();
-          tablet->execute_compaction(compaction_type);
-          _permit_limiter.release(permits);
-          // reset compaction
-          tablet->reset_compaction(compaction_type);
-          _pop_tablet_from_submitted_compaction(tablet, compaction_type);
+        std::unique_ptr<ThreadPool>& thread_pool =
+                (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                        ? _cumu_compaction_thread_pool
+                        : _base_compaction_thread_pool;
+        auto st = thread_pool->submit_func([=]() {
+            CgroupsMgr::apply_system_cgroup();
+            tablet->execute_compaction(compaction_type);
+            _permit_limiter.release(permits);
+            // reset compaction
+            tablet->reset_compaction(compaction_type);
+            _pop_tablet_from_submitted_compaction(tablet, compaction_type);
         });
         if (!st.ok()) {
             _permit_limiter.release(permits);
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 7513765931..58b3c8bca0 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -145,8 +145,11 @@ StorageEngine::~StorageEngine() {
     DEREGISTER_HOOK_METRIC(compaction_mem_consumption);
     _clear();
 
-    if (_compaction_thread_pool) {
-        _compaction_thread_pool->shutdown();
+    if (_base_compaction_thread_pool) {
+        _base_compaction_thread_pool->shutdown();
+    }
+    if (_cumu_compaction_thread_pool) {
+        _cumu_compaction_thread_pool->shutdown();
     }
     if (_convert_rowset_thread_pool) {
         _convert_rowset_thread_pool->shutdown();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index fff1f952ec..18a6960030 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -271,6 +271,8 @@ private:
 
     Status _handle_quick_compaction(TabletSharedPtr);
 
+    void _adjust_compaction_thread_num();
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -367,8 +369,9 @@ private:
 
     HeartbeatFlags* _heartbeat_flags;
 
-    std::unique_ptr<ThreadPool> _compaction_thread_pool;
     std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
 
     scoped_refptr<Thread> _alpha_rowset_scan_thread;
     std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 855177c86a..61c727769a 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -723,21 +723,6 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
         return false;
     }
 
-    if (tablet_state() == TABLET_RUNNING) {
-        // if tablet state is running, we need to check if it has consistent versions.
-        // tablet in other state such as TABLET_NOTREADY may not have complete versions.
-        ReadLock rdlock(_meta_lock);
-        const RowsetSharedPtr lastest_delta = rowset_with_max_version();
-        if (lastest_delta == nullptr) {
-            return false;
-        }
-
-        Version test_version = Version(0, lastest_delta->end_version());
-        if (OLAP_SUCCESS != capture_consistent_versions(test_version, nullptr)) {
-            return false;
-        }
-    }
-
     if (tablet_state() == TABLET_NOTREADY) {
         // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
         // removed. So, we only need to do the compaction when it is being converted.
@@ -1103,12 +1088,12 @@ TabletInfo Tablet::get_tablet_info() const {
 }
 
 void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
-        int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets) {
+        std::vector<RowsetSharedPtr>* candidate_rowsets) {
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return;
     }
     ReadLock rdlock(_meta_lock);
-    _cumulative_compaction_policy->pick_candidate_rowsets(skip_window_sec, _rs_version_map,
+    _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map,
                                                           _cumulative_point, candidate_rowsets);
 }
 
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 303780f712..d4e9d54693 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -219,7 +219,7 @@ public:
     TabletInfo get_tablet_info() const;
 
     void pick_candidate_rowsets_to_cumulative_compaction(
-            int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets);
+            std::vector<RowsetSharedPtr>* candidate_rowsets);
     void pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
 
     void calculate_cumulative_point();
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index 950cc18920..4866bf9c4c 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -247,7 +247,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     ASSERT_EQ(2, candidate_rowsets.size());
 }
@@ -267,7 +267,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -296,7 +296,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -739,7 +739,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     ASSERT_EQ(3, candidate_rowsets.size());
 }
@@ -757,7 +757,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     ASSERT_EQ(3, candidate_rowsets.size());
 }
@@ -776,7 +776,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -805,7 +805,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -834,7 +834,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -863,7 +863,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -892,7 +892,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -921,7 +921,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
@@ -950,7 +950,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version{-1, -1};
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index 8afd14f61a..e979b33766 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -66,11 +66,11 @@ There are two ways to configure BE configuration items:
 
 ## Examples
 
-1. Modify `max_compaction_concurrency` statically
+1. Modify `max_base_compaction_concurrency` statically
 
      By adding in the `be.conf` file:
 
-     ```max_compaction_concurrency=5```
+     ```max_base_compaction_concurrency=5```
 
      Then restart the BE process to take effect the configuration.
 
@@ -730,10 +730,16 @@ Default:10
 
 The maximum number of client caches per host. There are multiple client caches in BE, but currently we use the same cache size configuration. If necessary, use different configurations to set up different client-side caches
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * Type: int32
-* Description: The maximum of thread number in compaction thread pool.
+* Description: The maximum of thread number in base compaction thread pool.
+* Default value: 4
+
+### `max_cumu_compaction_threads`
+
+* Type: int32
+* Description: The maximum of thread number in cumulative compaction thread pool.
 * Default value: 10
 
 ### `max_consumer_num_per_group`
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index abbfc3b20b..bfe7f93285 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -64,11 +64,11 @@ BE 的配置项有两种方式进行配置:
 
 ## 应用举例
 
-1. 静态方式修改 `max_compaction_concurrency`
+1. 静态方式修改 `max_base_compaction_concurrency`
 
 	通过在 `be.conf` 文件中添加:
 
-	```max_compaction_concurrency=5```
+  ```max_base_compaction_concurrency=5```
 
 	之后重启 BE 进程以生效该配置。
 
@@ -731,10 +731,16 @@ load错误日志将在此时间后删除
 
 每个主机的最大客户端缓存数,BE 中有多种客户端缓存,但目前我们使用相同的缓存大小配置。 如有必要,使用不同的配置来设置不同的客户端缓存。
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * 类型:int32
-* 描述:Compaction线程池中线程数量的最大值。
+* 描述:Base Compaction线程池中线程数量的最大值。
+* 默认值:4
+
+### `max_cumu_compaction_threads`
+
+* 类型:int32
+* 描述:Cumulative Compaction线程池中线程数量的最大值。
 * 默认值:10
 
 ### `max_consumer_num_per_group`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org