You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/06/17 09:49:50 UTC

[doris] branch master updated: [opt](compaction) optimize compaction in concurrent load (#10153)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f35b235c3b [opt](compaction) optimize compaction in concurrent load (#10153)
f35b235c3b is described below

commit f35b235c3b1936f637faa0c1efee54623776e30c
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Fri Jun 17 17:49:45 2022 +0800

    [opt](compaction) optimize compaction in concurrent load (#10153)
    
    add some logic to opt compaction:
    1.seperate base&cumu compaction in case base compaction runs too long and
    affect cumu compaction
    2.fix level size in cu compaction so that file size below 64M have a right level
    size, when choose rowsets to do compaction, the policy will ignore big rowset,
    this will reduce about 25% cpu in high frequency concurrent load
    3.remove skip window restriction so rowset can do compaction right after
    generated, cause we'll not delete rowset after compaction. This will highly
    reduce compaction score in concurrent log.
    4.remove version consistence check in can_do_compaction, we'll choose a
    consecutive rowset to do compaction, so this logic is useless
    
    after add logic above, compaction score and cpu cost will have a substantial
    optimize in concurrent load.
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/common/config.h                             |   7 +-
 be/src/olap/cumulative_compaction.cpp              |   6 +-
 be/src/olap/cumulative_compaction_policy.cpp       |  12 +--
 be/src/olap/cumulative_compaction_policy.h         |   2 -
 be/src/olap/olap_server.cpp                        | 114 ++++++++++++++-------
 be/src/olap/storage_engine.cpp                     |   7 +-
 be/src/olap/storage_engine.h                       |   5 +-
 be/src/olap/tablet.cpp                             |  21 +---
 be/src/olap/tablet.h                               |   2 +-
 be/test/olap/cumulative_compaction_policy_test.cpp |  25 +++--
 docs/en/docs/admin-manual/config/be-config.md      |  14 ++-
 docs/zh-CN/docs/admin-manual/config/be-config.md   |  14 ++-
 12 files changed, 129 insertions(+), 100 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 4885c17193..56e7382320 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -285,17 +285,14 @@ CONF_mInt64(cumulative_size_based_compaction_lower_size_mbytes, "64");
 // cumulative compaction policy: min and max delta file's number
 CONF_mInt64(min_cumulative_compaction_num_singleton_deltas, "5");
 CONF_mInt64(max_cumulative_compaction_num_singleton_deltas, "1000");
-// cumulative compaction skips recently published deltas in order to prevent
-// compacting a version that might be queried (in case the query planning phase took some time).
-// the following config set the window size
-CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
 
 // if compaction of a tablet failed, this tablet should not be chosen to
 // compaction until this interval passes.
 CONF_mInt64(min_compaction_failure_interval_sec, "5"); // 5 seconds
 
 // This config can be set to limit thread number in compaction thread pool.
-CONF_mInt32(max_compaction_threads, "10");
+CONF_mInt32(max_base_compaction_threads, "4");
+CONF_mInt32(max_cumu_compaction_threads, "10");
 
 // This config can be set to limit thread number in convert rowset thread pool.
 CONF_mInt32(convert_rowset_thread_num, "0");
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index df8dbc50f1..35329e1362 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -95,15 +95,13 @@ Status CumulativeCompaction::execute_compact_impl() {
 Status CumulativeCompaction::pick_rowsets_to_compact() {
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(
-            config::cumulative_compaction_skip_window_seconds, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     if (candidate_rowsets.empty()) {
         return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_NO_SUITABLE_VERSION);
     }
 
-    // candidate_rowsets may not be continuous. Because some rowset may not be selected
-    // because the protection time has not expired(config::cumulative_compaction_skip_window_seconds).
+    // candidate_rowsets may not be continuous
     // So we need to choose the longest continuous path from it.
     std::vector<Version> missing_versions;
     RETURN_NOT_OK(find_longest_consecutive_version(&candidate_rowsets, &missing_versions));
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index a77eb44f73..ad73e1fc80 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -32,10 +32,11 @@ SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
           _size_based_promotion_ratio(size_based_promotion_ratio),
           _size_based_promotion_min_size(size_based_promotion_min_size),
           _size_based_compaction_lower_bound_size(size_based_compaction_lower_bound_size) {
-    // init _levels by divide 2 between size_based_promotion_size and size_based_compaction_lower_bound_size
+    // init _levels by divide 2 between size_based_compaction_lower_bound_size and 1K
+    // cu compaction handle file size less then size_based_compaction_lower_bound_size
     int64_t i_size = size_based_promotion_size / 2;
 
-    while (i_size >= size_based_compaction_lower_bound_size) {
+    while (i_size >= 1024) {
         _levels.push_back(i_size);
         i_size /= 2;
     }
@@ -460,16 +461,11 @@ void NumBasedCumulativeCompactionPolicy::calculate_cumulative_point(
 }
 
 void CumulativeCompactionPolicy::pick_candidate_rowsets(
-        int64_t skip_window_sec,
         const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
         int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
-    int64_t now = UnixSeconds();
     for (auto& it : rs_version_map) {
         // find all rowset version greater than cumulative_point and skip the create time in skip_window_sec
-        if (it.first.first >= cumulative_point &&
-            ((it.second->creation_time() + skip_window_sec < now)
-             // this case means a rowset has been compacted before which is not a new published rowset, so it should participate compaction
-             || (it.first.first != it.first.second))) {
+        if (it.first.first >= cumulative_point) {
             candidate_rowsets->push_back(it.second);
         }
     }
diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h
index 0f59d52834..079155c6b3 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -69,12 +69,10 @@ public:
 
     /// This function implements the policy which represents how to pick the candidate rowsets for compaction.
     /// This base class gives a unified implementation. Its derived classes also can override this function each other.
-    /// param skip_window_sec, it means skipping the rowsets which use create time plus skip_window_sec is greater than now.
     /// param rs_version_map, mapping from version to rowset
     /// param cumulative_point,  current cumulative point of tablet
     /// return candidate_rowsets, the container of candidate rowsets
     virtual void pick_candidate_rowsets(
-            int64_t skip_window_sec,
             const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
             int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets);
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index eb1d47c065..5088bd8775 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -67,11 +67,16 @@ Status StorageEngine::start_bg_threads() {
         data_dirs.push_back(tmp_store.second);
     }
 
-    int32_t max_thread_num = config::max_compaction_threads;
-    ThreadPoolBuilder("CompactionTaskThreadPool")
+    int32_t max_thread_num = config::max_base_compaction_threads;
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
             .set_min_threads(max_thread_num)
             .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
+            .build(&_base_compaction_thread_pool);
+    max_thread_num = config::max_cumu_compaction_threads;
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(max_thread_num)
+            .set_max_threads(max_thread_num)
+            .build(&_cumu_compaction_thread_pool);
 
     int32_t convert_rowset_thread_num = config::convert_rowset_thread_num;
     if (convert_rowset_thread_num > 0) {
@@ -88,11 +93,14 @@ Status StorageEngine::start_bg_threads() {
         LOG(INFO) << "alpha rowset scan thread started";
     }
 
-    ThreadPoolBuilder("CompactionTaskThreadPool")
-            .set_min_threads(max_thread_num)
-            .set_max_threads(max_thread_num)
-            .build(&_compaction_thread_pool);
-
+    ThreadPoolBuilder("BaseCompactionTaskThreadPool")
+            .set_min_threads(config::max_base_compaction_threads)
+            .set_max_threads(config::max_base_compaction_threads)
+            .build(&_base_compaction_thread_pool);
+    ThreadPoolBuilder("CumuCompactionTaskThreadPool")
+            .set_min_threads(config::max_cumu_compaction_threads)
+            .set_max_threads(config::max_cumu_compaction_threads)
+            .build(&_cumu_compaction_thread_pool);
     ThreadPoolBuilder("SmallCompactionTaskThreadPool")
             .set_min_threads(config::quick_compaction_max_threads)
             .set_max_threads(config::quick_compaction_max_threads)
@@ -364,6 +372,46 @@ void StorageEngine::_alpha_rowset_scan_thread_callback() {
     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(scan_interval_sec)));
 }
 
+void StorageEngine::_adjust_compaction_thread_num() {
+    if (_base_compaction_thread_pool->max_threads() != config::max_base_compaction_threads) {
+        int old_max_threads = _base_compaction_thread_pool->max_threads();
+        Status status =
+                _base_compaction_thread_pool->set_max_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+    if (_base_compaction_thread_pool->min_threads() != config::max_base_compaction_threads) {
+        int old_min_threads = _base_compaction_thread_pool->min_threads();
+        Status status =
+                _base_compaction_thread_pool->set_min_threads(config::max_base_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads
+                        << " to " << config::max_base_compaction_threads;
+        }
+    }
+
+    if (_cumu_compaction_thread_pool->max_threads() != config::max_cumu_compaction_threads) {
+        int old_max_threads = _cumu_compaction_thread_pool->max_threads();
+        Status status =
+                _cumu_compaction_thread_pool->set_max_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+    if (_cumu_compaction_thread_pool->min_threads() != config::max_cumu_compaction_threads) {
+        int old_min_threads = _cumu_compaction_thread_pool->min_threads();
+        Status status =
+                _cumu_compaction_thread_pool->set_min_threads(config::max_cumu_compaction_threads);
+        if (status.ok()) {
+            VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads
+                        << " to " << config::max_cumu_compaction_threads;
+        }
+    }
+}
+
 void StorageEngine::_compaction_tasks_producer_callback() {
 #ifdef GOOGLE_PROFILER
     ProfilerRegisterThread();
@@ -394,35 +442,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
     int64_t interval = config::generate_compaction_tasks_min_interval_ms;
     do {
         if (!config::disable_auto_compaction) {
-            VLOG_CRITICAL << "compaction thread pool. num_threads: "
-                          << _compaction_thread_pool->num_threads()
-                          << ", num_threads_pending_start: "
-                          << _compaction_thread_pool->num_threads_pending_start()
-                          << ", num_active_threads: "
-                          << _compaction_thread_pool->num_active_threads()
-                          << ", max_threads: " << _compaction_thread_pool->max_threads()
-                          << ", min_threads: " << _compaction_thread_pool->min_threads()
-                          << ", num_total_queued_tasks: "
-                          << _compaction_thread_pool->get_queue_size();
-
-            if (_compaction_thread_pool->max_threads() != config::max_compaction_threads) {
-                int old_max_threads = _compaction_thread_pool->max_threads();
-                Status status =
-                        _compaction_thread_pool->set_max_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool max_threads from "
-                              << old_max_threads << " to " << config::max_compaction_threads;
-                }
-            }
-            if (_compaction_thread_pool->min_threads() != config::max_compaction_threads) {
-                int old_min_threads = _compaction_thread_pool->min_threads();
-                Status status =
-                        _compaction_thread_pool->set_min_threads(config::max_compaction_threads);
-                if (status.ok()) {
-                    LOG(INFO) << "update compaction thread pool min_threads from "
-                              << old_min_threads << " to " << config::max_compaction_threads;
-                }
-            }
+            _adjust_compaction_thread_num();
 
             bool check_score = false;
             int64_t cur_time = UnixMillis();
@@ -441,6 +461,20 @@ void StorageEngine::_compaction_tasks_producer_callback() {
                     last_base_score_update_time = cur_time;
                 }
             }
+            std::unique_ptr<ThreadPool>& thread_pool =
+                    (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                            ? _cumu_compaction_thread_pool
+                            : _base_compaction_thread_pool;
+            VLOG_CRITICAL << "compaction thread pool. type: "
+                          << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU"
+                                                                                       : "BASE")
+                          << ", num_threads: " << thread_pool->num_threads()
+                          << ", num_threads_pending_start: "
+                          << thread_pool->num_threads_pending_start()
+                          << ", num_active_threads: " << thread_pool->num_active_threads()
+                          << ", max_threads: " << thread_pool->max_threads()
+                          << ", min_threads: " << thread_pool->min_threads()
+                          << ", num_total_queued_tasks: " << thread_pool->get_queue_size();
             std::vector<TabletSharedPtr> tablets_compaction =
                     _generate_compaction_tasks(compaction_type, data_dirs, check_score);
             if (tablets_compaction.size() == 0) {
@@ -623,7 +657,11 @@ Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet,
     int64_t permits = 0;
     Status st = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet, &permits);
     if (st.ok() && permits > 0 && _permit_limiter.request(permits)) {
-        auto st = _compaction_thread_pool->submit_func([=]() {
+        std::unique_ptr<ThreadPool>& thread_pool =
+                (compaction_type == CompactionType::CUMULATIVE_COMPACTION)
+                        ? _cumu_compaction_thread_pool
+                        : _base_compaction_thread_pool;
+        auto st = thread_pool->submit_func([=]() {
             SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::COMPACTION,
                                       tablet->get_compaction_mem_tracker(compaction_type));
             CgroupsMgr::apply_system_cgroup();
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index c872851352..5a75a4f88d 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -157,8 +157,11 @@ StorageEngine::~StorageEngine() {
     DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
     _clear();
 
-    if (_compaction_thread_pool) {
-        _compaction_thread_pool->shutdown();
+    if (_base_compaction_thread_pool) {
+        _base_compaction_thread_pool->shutdown();
+    }
+    if (_cumu_compaction_thread_pool) {
+        _cumu_compaction_thread_pool->shutdown();
     }
     if (_convert_rowset_thread_pool) {
         _convert_rowset_thread_pool->shutdown();
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 40d4303dab..e54a8d5f91 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -274,6 +274,8 @@ private:
 
     Status _handle_quick_compaction(TabletSharedPtr);
 
+    void _adjust_compaction_thread_num();
+
 private:
     struct CompactionCandidate {
         CompactionCandidate(uint32_t nicumulative_compaction_, int64_t tablet_id_, uint32_t index_)
@@ -381,8 +383,9 @@ private:
 
     HeartbeatFlags* _heartbeat_flags;
 
-    std::unique_ptr<ThreadPool> _compaction_thread_pool;
     std::unique_ptr<ThreadPool> _quick_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
+    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
 
     scoped_refptr<Thread> _alpha_rowset_scan_thread;
     std::unique_ptr<ThreadPool> _convert_rowset_thread_pool;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 771069e4ce..2db820b975 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -713,21 +713,6 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
         return false;
     }
 
-    if (tablet_state() == TABLET_RUNNING) {
-        // if tablet state is running, we need to check if it has consistent versions.
-        // tablet in other state such as TABLET_NOTREADY may not have complete versions.
-        std::shared_lock rdlock(_meta_lock);
-        const RowsetSharedPtr lastest_delta = rowset_with_max_version();
-        if (lastest_delta == nullptr) {
-            return false;
-        }
-
-        Version test_version = Version(0, lastest_delta->end_version());
-        if (!capture_consistent_versions(test_version, nullptr)) {
-            return false;
-        }
-    }
-
     if (tablet_state() == TABLET_NOTREADY) {
         // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
         // removed. So, we only need to do the compaction when it is being converted.
@@ -1092,13 +1077,13 @@ TabletInfo Tablet::get_tablet_info() const {
 }
 
 void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
-        int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets) {
+        std::vector<RowsetSharedPtr>* candidate_rowsets) {
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
         return;
     }
     std::shared_lock rdlock(_meta_lock);
-    _cumulative_compaction_policy->pick_candidate_rowsets(skip_window_sec, _rs_version_map,
-                                                          _cumulative_point, candidate_rowsets);
+    _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point,
+                                                          candidate_rowsets);
 }
 
 void Tablet::find_alpha_rowsets(std::vector<RowsetSharedPtr>* rowsets) const {
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 1156de36f1..e0f124988d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -211,7 +211,7 @@ public:
     TabletInfo get_tablet_info() const;
 
     void pick_candidate_rowsets_to_cumulative_compaction(
-            int64_t skip_window_sec, std::vector<RowsetSharedPtr>* candidate_rowsets);
+            std::vector<RowsetSharedPtr>* candidate_rowsets);
     void pick_candidate_rowsets_to_base_compaction(std::vector<RowsetSharedPtr>* candidate_rowsets);
 
     void calculate_cumulative_point();
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index 76a5acaf36..92d3dda5bd 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -256,7 +256,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(2, candidate_rowsets.size());
 }
@@ -279,7 +279,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -311,7 +311,7 @@ TEST_F(TestNumBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
     NumBasedCumulativeCompactionPolicy policy;
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -769,7 +769,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(3, candidate_rowsets.size());
 }
@@ -790,7 +790,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
     _tablet->calculate_cumulative_point();
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     EXPECT_EQ(3, candidate_rowsets.size());
 }
@@ -812,7 +812,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -844,7 +844,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -876,7 +876,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -908,7 +908,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -940,7 +940,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -972,7 +972,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -1004,7 +1004,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
 
     std::vector<RowsetSharedPtr> candidate_rowsets;
 
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(1000, &candidate_rowsets);
+    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets);
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -1081,7 +1081,6 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, _level_size) {
             dynamic_cast<SizeBasedCumulativeCompactionPolicy*>(
                     _tablet->_cumulative_compaction_policy.get());
 
-    EXPECT_EQ(4, policy->_levels.size());
     EXPECT_EQ(536870912, policy->_levels[0]);
     EXPECT_EQ(268435456, policy->_levels[1]);
     EXPECT_EQ(134217728, policy->_levels[2]);
diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md
index a75e263a81..043ffb7cc3 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -66,11 +66,11 @@ There are two ways to configure BE configuration items:
 
 ## Examples
 
-1. Modify `max_compaction_concurrency` statically
+1. Modify `max_base_compaction_concurrency` statically
 
      By adding in the `be.conf` file:
 
-     ```max_compaction_concurrency=5```
+     ```max_base_compaction_concurrency=5```
 
      Then restart the BE process to take effect the configuration.
 
@@ -736,10 +736,16 @@ Default: 10
 
 The maximum number of client caches per host. There are multiple client caches in BE, but currently we use the same cache size configuration. If necessary, use different configurations to set up different client-side caches
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * Type: int32
-* Description: The maximum of thread number in compaction thread pool.
+* Description: The maximum of thread number in base compaction thread pool.
+* Default value: 4
+
+### `max_cumu_compaction_threads`
+
+* Type: int32
+* Description: The maximum of thread number in cumulative compaction thread pool.
 * Default value: 10
 
 ### `max_consumer_num_per_group`
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 45dd774269..5d9a4ffbb6 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -64,11 +64,11 @@ BE 的配置项有两种方式进行配置:
 
 ## 应用举例
 
-1. 静态方式修改 `max_compaction_concurrency`
+1. 静态方式修改 `max_base_compaction_concurrency`
 
   通过在 `be.conf` 文件中添加:
 
-  ```max_compaction_concurrency=5```
+  ```max_base_compaction_concurrency=5```
 
   之后重启 BE 进程以生效该配置。
 
@@ -737,10 +737,16 @@ load错误日志将在此时间后删除
 
 每个主机的最大客户端缓存数,BE 中有多种客户端缓存,但目前我们使用相同的缓存大小配置。 如有必要,使用不同的配置来设置不同的客户端缓存。
 
-### `max_compaction_threads`
+### `max_base_compaction_threads`
 
 * 类型:int32
-* 描述:Compaction线程池中线程数量的最大值。
+* 描述:Base Compaction线程池中线程数量的最大值。
+* 默认值:4
+
+### `max_cumu_compaction_threads`
+
+* 类型:int32
+* 描述:Cumulative Compaction线程池中线程数量的最大值。
 * 默认值:10
 
 ### `max_consumer_num_per_group`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org