You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/10 00:32:23 UTC
[doris] branch master updated: [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ab186a60ce [enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631
ab186a60ce is described below
commit ab186a60ce80d4f168b503cdba1b3ab740067d59
Author: plat1ko <pl...@gmail.com>
AuthorDate: Tue Jan 10 08:32:15 2023 +0800
[enhancement](compaction) Optimize judging delete rowset and picking candidate rowsets for compaction #15631
Tablet::version_for_delete_predicate should travel all rowset metas in tablet meta which complex is O(N), however we can directly judge whether this rowset is a delete rowset by RowsetMeta::has_delete_predicate which complex is O(1).
As we won't call Tablet::version_for_delete_predicate when pick input rowsets for compaction, we can reduce the critical area of Tablet::_meta_lock.
---
be/src/olap/base_compaction.cpp | 5 +--
be/src/olap/compaction.cpp | 2 +-
be/src/olap/cumulative_compaction.cpp | 5 +--
be/src/olap/cumulative_compaction_policy.cpp | 18 ++-------
be/src/olap/cumulative_compaction_policy.h | 9 -----
be/src/olap/tablet.cpp | 39 ++++++++++++-------
be/src/olap/tablet.h | 8 +---
be/test/olap/cumulative_compaction_policy_test.cpp | 45 +++++-----------------
8 files changed, 42 insertions(+), 89 deletions(-)
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index ea4188788a..36fdb193de 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -106,10 +106,7 @@ void BaseCompaction::_filter_input_rowset() {
}
Status BaseCompaction::pick_rowsets_to_compact() {
- _input_rowsets.clear();
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_base_compaction(&_input_rowsets, rdlock);
- std::sort(_input_rowsets.begin(), _input_rowsets.end(), Rowset::comparator);
+ _input_rowsets = _tablet->pick_candidate_rowsets_to_base_compaction();
RETURN_NOT_OK(check_version_continuity(_input_rowsets));
RETURN_NOT_OK(_check_rowset_overlapping(_input_rowsets));
_filter_input_rowset();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 2089461e12..e2c7473f5f 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -202,7 +202,7 @@ bool Compaction::handle_ordered_data_compaction() {
// has a delete version, use original compaction
if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
for (auto rowset : _input_rowsets) {
- if (_tablet->version_for_delete_predicate(rowset->version())) {
+ if (rowset->rowset_meta()->has_delete_predicate()) {
return false;
}
}
diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp
index 6feb8c5c1c..765790859c 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -96,10 +96,7 @@ Status CumulativeCompaction::execute_compact_impl() {
}
Status CumulativeCompaction::pick_rowsets_to_compact() {
- std::vector<RowsetSharedPtr> candidate_rowsets;
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
if (candidate_rowsets.empty()) {
return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>();
}
diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp
index 3ccbe0eea7..a097ce11b2 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -85,7 +85,7 @@ void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
break;
}
- bool is_delete = tablet->version_for_delete_predicate(rs->version());
+ bool is_delete = rs->has_delete_predicate();
// break the loop if segments in this rowset is overlapping.
if (!is_delete && rs->is_segments_overlapping()) {
@@ -245,10 +245,9 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
int transient_size = 0;
*compaction_score = 0;
int64_t total_size = 0;
- for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
- RowsetSharedPtr rowset = candidate_rowsets[i];
+ for (auto& rowset : candidate_rowsets) {
// check whether this rowset is delete version
- if (tablet->version_for_delete_predicate(rowset->version())) {
+ if (rowset->rowset_meta()->has_delete_predicate()) {
*last_delete_version = rowset->version();
if (!input_rowsets->empty()) {
// we meet a delete version, and there were other versions before.
@@ -344,17 +343,6 @@ int SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
return 0;
}
-void CumulativeCompactionPolicy::pick_candidate_rowsets(
- const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
- int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets) {
- for (const auto& [version, rs] : rs_version_map) {
- if (version.first >= cumulative_point && rs->is_local()) {
- candidate_rowsets->push_back(rs);
- }
- }
- std::sort(candidate_rowsets->begin(), candidate_rowsets->end(), Rowset::comparator);
-}
-
std::shared_ptr<CumulativeCompactionPolicy>
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy() {
return std::unique_ptr<CumulativeCompactionPolicy>(new SizeBasedCumulativeCompactionPolicy());
diff --git a/be/src/olap/cumulative_compaction_policy.h b/be/src/olap/cumulative_compaction_policy.h
index fabd9344c1..16ac7fc549 100644
--- a/be/src/olap/cumulative_compaction_policy.h
+++ b/be/src/olap/cumulative_compaction_policy.h
@@ -58,15 +58,6 @@ public:
Tablet* tablet, TabletState state, const std::vector<RowsetMetaSharedPtr>& all_rowsets,
int64_t current_cumulative_point, uint32_t* score) = 0;
- /// This function implements the policy which represents how to pick the candidate rowsets for compaction.
- /// This base class gives a unified implementation. Its derived classes also can override this function each other.
- /// param rs_version_map, mapping from version to rowset
- /// param cumulative_point, current cumulative point of tablet
- /// return candidate_rowsets, the container of candidate rowsets
- virtual void pick_candidate_rowsets(
- const std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>& rs_version_map,
- int64_t cumulative_point, std::vector<RowsetSharedPtr>* candidate_rowsets);
-
/// Pick input rowsets from candidate rowsets for compaction. This function is pure virtual function.
/// Its implementation depends on concrete compaction policy.
/// param candidate_rowsets, the candidate_rowsets vector container to pick input rowsets
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index dae86a06e8..a67ace80a5 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -1125,25 +1125,36 @@ TabletInfo Tablet::get_tablet_info() const {
return TabletInfo(tablet_id(), schema_hash(), tablet_uid());
}
-void Tablet::pick_candidate_rowsets_to_cumulative_compaction(
- std::vector<RowsetSharedPtr>* candidate_rowsets,
- std::shared_lock<std::shared_mutex>& /* meta lock*/) {
+std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_cumulative_compaction() {
+ std::vector<RowsetSharedPtr> candidate_rowsets;
if (_cumulative_point == K_INVALID_CUMULATIVE_POINT) {
- return;
+ return candidate_rowsets;
+ }
+ {
+ std::shared_lock rlock(_meta_lock);
+ for (const auto& [version, rs] : _rs_version_map) {
+ if (version.first >= _cumulative_point && rs->is_local()) {
+ candidate_rowsets.push_back(rs);
+ }
+ }
}
- _cumulative_compaction_policy->pick_candidate_rowsets(_rs_version_map, _cumulative_point,
- candidate_rowsets);
+ std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
+ return candidate_rowsets;
}
-void Tablet::pick_candidate_rowsets_to_base_compaction(
- vector<RowsetSharedPtr>* candidate_rowsets,
- std::shared_lock<std::shared_mutex>& /* meta lock*/) {
- for (auto& it : _rs_version_map) {
- // Do compaction on local rowsets only.
- if (it.first.first < _cumulative_point && it.second->is_local()) {
- candidate_rowsets->push_back(it.second);
+std::vector<RowsetSharedPtr> Tablet::pick_candidate_rowsets_to_base_compaction() {
+ std::vector<RowsetSharedPtr> candidate_rowsets;
+ {
+ std::shared_lock rlock(_meta_lock);
+ for (const auto& [version, rs] : _rs_version_map) {
+ // Do compaction on local rowsets only.
+ if (version.first < _cumulative_point && rs->is_local()) {
+ candidate_rowsets.push_back(rs);
+ }
}
}
+ std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
+ return candidate_rowsets;
}
// For http compaction action
@@ -1173,7 +1184,7 @@ void Tablet::get_compaction_status(std::string* json_result) {
delete_flags.reserve(rowsets.size());
for (auto& rs : rowsets) {
- delete_flags.push_back(version_for_delete_predicate(rs->version()));
+ delete_flags.push_back(rs->rowset_meta()->has_delete_predicate());
}
// get snapshot version path json_doc
_timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 3adfdeee2a..c82fa461a1 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -225,12 +225,8 @@ public:
TabletInfo get_tablet_info() const;
- void pick_candidate_rowsets_to_cumulative_compaction(
- std::vector<RowsetSharedPtr>* candidate_rowsets,
- std::shared_lock<std::shared_mutex>& /* meta lock*/);
- void pick_candidate_rowsets_to_base_compaction(
- std::vector<RowsetSharedPtr>* candidate_rowsets,
- std::shared_lock<std::shared_mutex>& /* meta lock*/);
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction();
+ std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
void calculate_cumulative_point();
// TODO(ygl):
diff --git a/be/test/olap/cumulative_compaction_policy_test.cpp b/be/test/olap/cumulative_compaction_policy_test.cpp
index 5be78e9190..f4760b9bf0 100644
--- a/be/test/olap/cumulative_compaction_policy_test.cpp
+++ b/be/test/olap/cumulative_compaction_policy_test.cpp
@@ -400,10 +400,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
EXPECT_EQ(3, candidate_rowsets.size());
}
@@ -419,10 +416,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_candidate_rowsets_big_base)
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
-
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
EXPECT_EQ(3, candidate_rowsets.size());
}
@@ -438,10 +432,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_normal) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -468,10 +459,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_big_base) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -498,10 +486,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_promotion) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -528,10 +513,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_same_leve
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -558,10 +540,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_empty) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -588,10 +567,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_not_reach_min
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
@@ -618,10 +594,7 @@ TEST_F(TestSizeBasedCumulativeCompactionPolicy, pick_input_rowsets_delete) {
_tablet->init();
_tablet->calculate_cumulative_point();
- std::vector<RowsetSharedPtr> candidate_rowsets;
-
- std::shared_lock rdlock(_tablet->get_header_lock());
- _tablet->pick_candidate_rowsets_to_cumulative_compaction(&candidate_rowsets, rdlock);
+ auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
std::vector<RowsetSharedPtr> input_rowsets;
Version last_delete_version {-1, -1};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org