You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/10 00:32:23 UTC

[doris] branch master updated: [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ab186a60ce [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631
ab186a60ce is described below

commit ab186a60ce80d4f168b503cdba1b3ab740067d59
Author: plat1ko <pl...@gmail.com>
AuthorDate: Tue Jan 10 08:32:15 2023 +0800

    [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631
    
    Tablet::version_for_delete_predicate should travel all rowset metas in tablet meta which complex is O(N), however we can directly judge whether this rowset is a delete rowset by RowsetMeta::has_delete_predicate which complex is O(1).
    As we won't call Tablet::version_for_delete_predicate when pick input rowsets for compaction, we can reduce the critical area of Tablet::_meta_lock.
---
 be/src/olap/base_compaction.cpp                    |  5 +--
 be/src/olap/compaction.cpp                         |  2 +-
 be/src/olap/cumulative_compaction.cpp              |  5 +--
 be/src/olap/cumulative_compaction_policy.cpp       | 18 ++-------
 be/src/olap/cumulative_compaction_policy.h         |  9 -----
 be/src/olap/tablet.cpp                             | 39 ++++++++++++-------
 be/src/olap/tablet.h                               |  8 +---
 be/test/olap/cumulative_compaction_policy_test.cpp | 45 +++++-----------------
 8 files changed, 42 insertions(+), 89 deletions(-)

diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index ea4188788a..36fdb193de 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -106,10 +106,7 @@ void BaseCompaction::_filter_input_rowset() {
 }
 
 Status BaseCompaction::pick_rowsets_to_compact() {
-    _input_rowsets.clear();
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets, rdlock);
-    std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator);
+    _input_rowsets = _tablet->pick_candidate_rowsets_to_base_compaction();
     RETURN_NOT_OK(check_version_continuity(_input_rowsets));
     RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
     _filter_input_rowset();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 2089461e12..e2c7473f5f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -202,7 +202,7 @@ bool Compaction::handle_ordered_data_compaction() {
     // has a delete version, use original compaction
     if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
         for (auto rowset : _input_rowsets) {
-            if (_tablet->version_for_delete_predicate(rowset->version())) {
+            if (rowset->rowset_meta()->has_delete_predicate()) {
                 return false;
             }
         }
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 6feb8c5c1c..765790859c 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -96,10 +96,7 @@ Status CumulativeCompaction::execute_compact_impl() {
 }
 
 Status CumulativeCompaction::pick_rowsets_to_compact() {
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
     if (candidate_rowsets.empty()) {
         return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>();
     }
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index 3ccbe0eea7..a097ce11b2 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -85,7 +85,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
                 break;
             }
 
-            bool is_delete = tablet->version_for_delete_predicate(rs->version());
+            bool is_delete = rs->has_delete_predicate();
 
             // break the loop if segments in this rowset is overlapping.
             if (!is_delete && rs->is_segments_overlapping()) {
@@ -245,10 +245,9 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
     int transient_size = 0;
     *compaction_score = 0;
     int64_t total_size = 0;
-    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
-        RowsetSharedPtr rowset = candidate_rowsets[i];
+    for (auto& rowset : candidate_rowsets) {
         // check whether this rowset is delete version
-        if (tablet->version_for_delete_predicate(rowset->version())) {
+        if (rowset->rowset_meta()->has_delete_predicate()) {
             *last_delete_version = rowset->version();
             if (!input_rowsets->empty()) {
                 // we meet a delete version, and there were other versions before.
@@ -344,17 +343,6 @@ int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
     return 0;
 }
 
-void CumulativeCompactionPolicy::pick_candidate_rowsets(
-        const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
-        int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
-    for (const auto& [version, rs] : rs_version_map) {
-        if (version.first >= cumulative_point && rs->is_local()) {
-            candidate_rowsets->push_back(rs);
-        }
-    }
-    std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);
-}
-
 std::shared_ptr<CumulativeCompactionPolicy>
 CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {
     return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy());
diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h
index fabd9344c1..16ac7fc549 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -58,15 +58,6 @@ public:
             Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets,
             int64_t current_cumulative_point, uint32_t* score) = 0;
 
-    /// This function implements the policy which represents how to pick the candidate rowsets for compaction.
-    /// This base class gives a unified implementation. Its derived classes also can override this function each other.
-    /// param rs_version_map, mapping from version to rowset
-    /// param cumulative_point,  current cumulative point of tablet
-    /// return candidate_rowsets, the container of candidate rowsets
-    virtual void pick_candidate_rowsets(
-            const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
-            int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets);
-
     /// Pick input rowsets from candidate rowsets for compaction. This function is pure virtual function.
     /// Its implementation depends on concrete compaction policy.
     /// param candidate_rowsets, the candidate_rowsets vector container to pick input rowsets
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index dae86a06e8..a67ace80a5 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1125,25 +1125,36 @@ TabletInfo Tablet::get_tablet_info() const {
     return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
 }
 
-void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
-        std::vector<RowsetSharedPtr>* candidate_rowsets,
-        std::shared_lock<std::shared_mutex>& /* meta lock*/) {
+std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compaction() {
+    std::vector<RowsetSharedPtr> candidate_rowsets;
     if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
-        return;
+        return candidate_rowsets;
+    }
+    {
+        std::shared_lock rlock(_meta_lock);
+        for (const auto& [version, rs] : _rs_version_map) {
+            if (version.first >= _cumulative_point && rs->is_local()) {
+                candidate_rowsets.push_back(rs);
+            }
+        }
     }
-    _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point,
-                                                          candidate_rowsets);
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
+    return candidate_rowsets;
 }
 
-void Tablet::pick_candidate_rowsets_to_base_compaction(
-        vector<RowsetSharedPtr>* candidate_rowsets,
-        std::shared_lock<std::shared_mutex>& /* meta lock*/) {
-    for (auto& it : _rs_version_map) {
-        // Do compaction on local rowsets only.
-        if (it.first.first < _cumulative_point && it.second->is_local()) {
-            candidate_rowsets->push_back(it.second);
+std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() {
+    std::vector<RowsetSharedPtr> candidate_rowsets;
+    {
+        std::shared_lock rlock(_meta_lock);
+        for (const auto& [version, rs] : _rs_version_map) {
+            // Do compaction on local rowsets only.
+            if (version.first < _cumulative_point && rs->is_local()) {
+                candidate_rowsets.push_back(rs);
+            }
         }
     }
+    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
+    return candidate_rowsets;
 }
 
 // For http compaction action
@@ -1173,7 +1184,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
 
         delete_flags.reserve(rowsets.size());
         for (auto& rs : rowsets) {
-            delete_flags.push_back(version_for_delete_predicate(rs->version()));
+            delete_flags.push_back(rs->rowset_meta()->has_delete_predicate());
         }
         // get snapshot version path json_doc
         _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 3adfdeee2a..c82fa461a1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -225,12 +225,8 @@ public:
 
     TabletInfo get_tablet_info() const;
 
-    void pick_candidate_rowsets_to_cumulative_compaction(
-            std::vector<RowsetSharedPtr>* candidate_rowsets,
-            std::shared_lock<std::shared_mutex>& /* meta lock*/);
-    void pick_candidate_rowsets_to_base_compaction(
-            std::vector<RowsetSharedPtr>* candidate_rowsets,
-            std::shared_lock<std::shared_mutex>& /* meta lock*/);
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction();
+    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
 
     void calculate_cumulative_point();
     // TODO(ygl):
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index 5be78e9190..f4760b9bf0 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -400,10 +400,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
     EXPECT_EQ(3, candidate_rowsets.size());
 }
 
@@ -419,10 +416,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
     EXPECT_EQ(3, candidate_rowsets.size());
 }
 
@@ -438,10 +432,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -468,10 +459,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -498,10 +486,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -528,10 +513,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -558,10 +540,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -588,10 +567,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};
@@ -618,10 +594,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
     _tablet->init();
     _tablet->calculate_cumulative_point();
 
-    std::vector<RowsetSharedPtr> candidate_rowsets;
-
-    std::shared_lock rdlock(_tablet->get_header_lock());
-    _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+    auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
 
     std::vector<RowsetSharedPtr> input_rowsets;
     Version last_delete_version {-1, -1};


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org