You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/18 11:26:09 UTC
[doris] branch master updated: [bugfix](merge-on-write) optimize rowset tree and tablet header lock (#20911)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 48065fce19 [bugfix](merge-on-write) optimize rowset tree and tablet header lock (#20911)
48065fce19 is described below
commit 48065fce19170d7d3eb5c0d5ae023f560fac6277
Author: Xin Liao <li...@126.com>
AuthorDate: Sun Jun 18 19:26:02 2023 +0800
[bugfix](merge-on-write) optimize rowset tree and tablet header lock (#20911)
---
be/src/olap/delta_writer.cpp | 4 +-
be/src/olap/memtable.cpp | 17 +-
be/src/olap/rowset/rowset_meta.h | 3 +
be/src/olap/rowset/segment_v2/segment.cpp | 1 +
be/src/olap/rowset/segment_v2/segment_writer.cpp | 8 +-
be/src/olap/tablet.cpp | 299 +++++++++++------------
be/src/olap/tablet.h | 22 +-
be/src/olap/tablet_meta.cpp | 1 +
be/src/service/point_query_executor.cpp | 10 +-
be/test/olap/tablet_test.cpp | 62 -----
10 files changed, 191 insertions(+), 236 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 394950a193..df9167484b 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -454,11 +454,9 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments,
_delete_bitmap));
}
- int64_t cur_max_version = _tablet->max_version().second;
RETURN_IF_ERROR(_tablet->commit_phase_update_delete_bitmap(
- _cur_rowset, _rowset_ids, _delete_bitmap, cur_max_version, segments, _req.txn_id,
+ _cur_rowset, _rowset_ids, _delete_bitmap, segments, _req.txn_id,
_rowset_writer.get()));
- _rowset_ids = _tablet->all_rs_id(cur_max_version);
}
Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
_req.load_id, _cur_rowset, false);
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2cdc440848..ace9f4f689 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -453,15 +453,18 @@ Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
- std::shared_lock meta_rlock(_tablet->get_header_lock());
- // tablet is under alter process. The delete bitmap will be calculated after conversion.
- if (_tablet->tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
- return Status::OK();
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock meta_rlock(_tablet->get_header_lock());
+ // tablet is under alter process. The delete bitmap will be calculated after conversion.
+ if (_tablet->tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
+ return Status::OK();
+ }
+ specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
}
-
OlapStopWatch watch;
- RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, &_mow_context->rowset_ids,
+ RETURN_IF_ERROR(_tablet->calc_delete_bitmap(rowset, segments, specified_rowsets,
_mow_context->delete_bitmap,
_mow_context->max_version));
size_t total_rows = std::accumulate(
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 6736c53cac..68bb1e7075 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -321,6 +321,9 @@ public:
segments_key_bounds->push_back(key_range);
}
}
+
+ auto& get_segments_key_bounds() { return _rowset_meta_pb.segments_key_bounds(); }
+
virtual bool get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
// for compatibility, old version has not segment key bounds
if (_rowset_meta_pb.segments_key_bounds_size() == 0) {
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index 527f8391cc..e046785bb0 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -378,6 +378,7 @@ Status Segment::lookup_row_key(const Slice& key, bool with_seq_col, RowLocation*
}
row_location->row_id = index_iterator->get_current_ordinal();
row_location->segment_id = _segment_id;
+ row_location->rowset_id = _rowset_id;
if (has_seq_col) {
size_t num_to_read = 1;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 8f7d27a947..a5c9fa147b 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -366,9 +366,13 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
std::vector<bool> use_default_flag;
use_default_flag.reserve(num_rows);
std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches;
- // locate rows in base data
+ std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(_tablet->get_header_lock());
+ specified_rowsets = _tablet->get_rowset_by_ids(&_mow_context->rowset_ids);
+ }
+ // locate rows in base data
+ {
for (size_t pos = row_pos; pos < num_rows; pos++) {
std::string key = _full_encode_keys(key_columns, pos);
RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
@@ -377,7 +381,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
- auto st = _tablet->lookup_row_key(key, false, &_mow_context->rowset_ids, &loc,
+ auto st = _tablet->lookup_row_key(key, false, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<NOT_FOUND>()) {
if (!_tablet_schema->allow_key_not_exist_in_partial_update()) {
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 6a3749ca4f..79d0237931 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -303,10 +303,6 @@ Status Tablet::_init_once_action() {
_stale_rs_version_map[version] = std::move(rowset);
}
- if (_schema->keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- _rowset_tree = std::make_unique<RowsetTree>();
- res = _rowset_tree->Init(rowset_vec);
- }
return res;
}
@@ -331,9 +327,6 @@ Status Tablet::revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
// reconstruct from tablet meta
_timestamped_version_tracker.construct_versioned_tracker(_tablet_meta->all_rs_metas());
if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- auto new_rowset_tree = std::make_unique<RowsetTree>();
- ModifyRowSetTree(*_rowset_tree, to_delete, to_add, new_rowset_tree.get());
- _rowset_tree = std::move(new_rowset_tree);
std::vector<RowsetSharedPtr> calc_delete_bitmap_rowsets;
int64_t to_add_min_version = INT64_MAX;
int64_t to_add_max_version = INT64_MIN;
@@ -422,13 +415,6 @@ Status Tablet::add_rowset(RowsetSharedPtr rowset) {
_rs_version_map[rowset->version()] = rowset;
_timestamped_version_tracker.add_version(rowset->version());
- // Update rowset tree
- if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- auto new_rowset_tree = std::make_unique<RowsetTree>();
- ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
- _rowset_tree = std::move(new_rowset_tree);
- }
-
std::vector<RowsetSharedPtr> rowsets_to_delete;
// yiguolei: temp code, should remove the rowset contains by this rowset
// but it should be removed in multi path version
@@ -521,13 +507,6 @@ Status Tablet::modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
_tablet_meta->modify_rs_metas(rs_metas_to_add, rs_metas_to_delete, same_version);
- // Update rowset tree
- if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- auto new_rowset_tree = std::make_unique<RowsetTree>();
- ModifyRowSetTree(*_rowset_tree, to_delete, to_add, new_rowset_tree.get());
- _rowset_tree = std::move(new_rowset_tree);
- }
-
if (!same_version) {
// add rs_metas_to_delete to tracker
_timestamped_version_tracker.add_stale_path_version(rs_metas_to_delete);
@@ -658,13 +637,6 @@ Status Tablet::add_inc_rowset(const RowsetSharedPtr& rowset) {
RETURN_IF_ERROR(_tablet_meta->add_rs_meta(rowset->rowset_meta()));
_rs_version_map[rowset->version()] = rowset;
- // Update rowset tree
- if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- auto new_rowset_tree = std::make_unique<RowsetTree>();
- ModifyRowSetTree(*_rowset_tree, {}, {rowset}, new_rowset_tree.get());
- _rowset_tree = std::move(new_rowset_tree);
- }
-
_timestamped_version_tracker.add_version(rowset->version());
++_newly_created_rowset_num;
@@ -1190,11 +1162,6 @@ void Tablet::delete_all_files() {
it.second->remove();
}
_stale_rs_version_map.clear();
-
- if (keys_type() == UNIQUE_KEYS && enable_unique_key_merge_on_write()) {
- // clear rowset_tree
- _rowset_tree = std::make_unique<RowsetTree>();
- }
}
void Tablet::check_tablet_path_exists() {
@@ -2722,71 +2689,71 @@ Status Tablet::lookup_row_data(const Slice& encoded_key, const RowLocation& row_
return Status::OK();
}
-// ATTN: caller should hold the meta lock.
Status Tablet::lookup_row_key(
- const Slice& encoded_key, bool with_seq_col, const RowsetIdUnorderedSet* rowset_ids,
- RowLocation* row_location, uint32_t version,
+ const Slice& encoded_key, bool with_seq_col,
+ const std::vector<RowsetSharedPtr>& specified_rowsets, RowLocation* row_location,
+ uint32_t version,
std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId>& segment_caches,
RowsetSharedPtr* rowset) {
- std::vector<std::pair<RowsetSharedPtr, int32_t>> selected_rs;
size_t seq_col_length = 0;
if (_schema->has_sequence_col() && with_seq_col) {
seq_col_length = _schema->column(_schema->sequence_col_idx()).length() + 1;
}
Slice key_without_seq = Slice(encoded_key.get_data(), encoded_key.get_size() - seq_col_length);
- _rowset_tree->FindRowsetsWithKeyInRange(key_without_seq, rowset_ids, &selected_rs);
- if (selected_rs.empty()) {
- return Status::NotFound("No rowsets contains the key in key range");
- }
- // Usually newly written data has a higher probability of being modified, so prefer
- // to search the key in the rowset with larger version.
- std::sort(selected_rs.begin(), selected_rs.end(),
- [](std::pair<RowsetSharedPtr, int32_t>& a, std::pair<RowsetSharedPtr, int32_t>& b) {
- if (a.first->end_version() == b.first->end_version()) {
- return a.second > b.second;
- }
- return a.first->end_version() > b.first->end_version();
- });
RowLocation loc;
- for (auto& rs : selected_rs) {
- if (rs.first->end_version() > version) {
+
+ for (auto& rs : specified_rowsets) {
+ auto& segments_key_bounds = rs->rowset_meta()->get_segments_key_bounds();
+ int num_segments = rs->num_segments();
+ DCHECK_EQ(segments_key_bounds.size(), num_segments);
+ std::vector<uint32_t> picked_segments;
+ for (int i = num_segments - 1; i >= 0; i--) {
+ if (key_without_seq.compare(segments_key_bounds[i].max_key()) > 0 ||
+ key_without_seq.compare(segments_key_bounds[i].min_key()) < 0) {
+ continue;
+ }
+ picked_segments.emplace_back(i);
+ }
+ if (picked_segments.empty()) {
continue;
}
- auto iter = segment_caches.find(rs.first->rowset_id());
+
+ auto iter = segment_caches.find(rs->rowset_id());
if (iter == segment_caches.end()) {
SegmentCacheHandle segment_cache_handle;
RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
- std::static_pointer_cast<BetaRowset>(rs.first), &segment_cache_handle, true));
- iter = segment_caches.emplace(rs.first->rowset_id(), std::move(segment_cache_handle))
- .first;
+ std::static_pointer_cast<BetaRowset>(rs), &segment_cache_handle, true));
+ iter = segment_caches.emplace(rs->rowset_id(), std::move(segment_cache_handle)).first;
}
auto& segments = iter->second.get_segments();
- DCHECK_GT(segments.size(), rs.second);
- Status s = segments[rs.second]->lookup_row_key(encoded_key, with_seq_col, &loc);
- if (s.is<NOT_FOUND>()) {
- continue;
- }
- if (!s.ok()) {
- return s;
- }
- loc.rowset_id = rs.first->rowset_id();
- if (_tablet_meta->delete_bitmap().contains_agg_without_cache(
- {loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
- // if has sequence col, we continue to compare the sequence_id of
- // all rowsets, util we find an existing key.
- if (_schema->has_sequence_col()) {
+ DCHECK_EQ(segments.size(), num_segments);
+
+ for (auto id : picked_segments) {
+ Status s = segments[id]->lookup_row_key(encoded_key, with_seq_col, &loc);
+ if (s.is<NOT_FOUND>()) {
continue;
}
- // The key is deleted, we don't need to search for it any more.
- break;
- }
- *row_location = loc;
- if (rowset) {
- // return it's rowset
- *rowset = rs.first;
+ if (!s.ok()) {
+ return s;
+ }
+ if (_tablet_meta->delete_bitmap().contains_agg_without_cache(
+ {loc.rowset_id, loc.segment_id, version}, loc.row_id)) {
+ // if has sequence col, we continue to compare the sequence_id of
+ // all rowsets, util we find an existing key.
+ if (_schema->has_sequence_col()) {
+ continue;
+ }
+ // The key is deleted, we don't need to search for it any more.
+ break;
+ }
+ *row_location = loc;
+ if (rowset) {
+ // return it's rowset
+ *rowset = rs;
+ }
+ // find it and return
+ return s;
}
- // find it and return
- return s;
}
return Status::NotFound("can't find key in all rowsets");
}
@@ -2837,7 +2804,7 @@ void Tablet::sort_block(vectorized::Block& in_block, vectorized::Block& output_b
Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
const segment_v2::SegmentSharedPtr& seg,
- const RowsetIdUnorderedSet* specified_rowset_ids,
+ const std::vector<RowsetSharedPtr>& specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t end_version,
RowsetWriter* rowset_writer) {
OlapStopWatch watch;
@@ -2900,52 +2867,49 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
continue;
}
- if (specified_rowset_ids != nullptr && !specified_rowset_ids->empty()) {
- RowsetSharedPtr rowset_find;
- auto st = lookup_row_key(key, true, specified_rowset_ids, &loc,
- dummy_version.first - 1, segment_caches, &rowset_find);
- bool expected_st = st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>();
- DCHECK(expected_st) << "unexpected error status while lookup_row_key:" << st;
- if (!expected_st) {
- return st;
- }
- if (st.is<NOT_FOUND>()) {
- ++row_id;
- continue;
- }
+ RowsetSharedPtr rowset_find;
+ auto st = lookup_row_key(key, true, specified_rowsets, &loc, dummy_version.first - 1,
+ segment_caches, &rowset_find);
+ bool expected_st = st.ok() || st.is<NOT_FOUND>() || st.is<ALREADY_EXIST>();
+ DCHECK(expected_st) << "unexpected error status while lookup_row_key:" << st;
+ if (!expected_st) {
+ return st;
+ }
+ if (st.is<NOT_FOUND>()) {
+ ++row_id;
+ continue;
+ }
- // sequence id smaller than the previous one, so delete current row
- if (st.is<ALREADY_EXIST>()) {
- delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
- continue;
- } else if (is_partial_update && rowset_writer != nullptr) {
- // In publish version, record rows to be deleted for concurrent update
- // For example, if version 5 and 6 update a row, but version 6 only see
- // version 4 when write, and when publish version, version 5's value will
- // be marked as deleted and it's update is losed.
- // So here we should read version 5's columns and build a new row, which is
- // consists of version 6's update columns and version 5's origin columns
- // here we build 2 read plan for ori values and update values
- prepare_to_read(loc, pos, &read_plan_ori);
- prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos,
- &read_plan_update);
- rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
- ++pos;
- // delete bitmap will be calculate when memtable flush and
- // publish. The two stages may see different versions.
- // When there is sequence column, the currently imported data
- // of rowset may be marked for deletion at memtablet flush or
- // publish because the seq column is smaller than the previous
- // rowset.
- // just set 0 as a unified temporary version number, and update to
- // the real version number later.
- delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
- delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
- continue;
- }
- // when st = ok
+ // sequence id smaller than the previous one, so delete current row
+ if (st.is<ALREADY_EXIST>()) {
+ delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
+ continue;
+ } else if (is_partial_update && rowset_writer != nullptr) {
+ // In publish version, record rows to be deleted for concurrent update
+ // For example, if version 5 and 6 update a row, but version 6 only see
+ // version 4 when write, and when publish version, version 5's value will
+ // be marked as deleted and it's update is losed.
+ // So here we should read version 5's columns and build a new row, which is
+ // consists of version 6's update columns and version 5's origin columns
+ // here we build 2 read plan for ori values and update values
+ prepare_to_read(loc, pos, &read_plan_ori);
+ prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update);
+ rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
+ ++pos;
+ // delete bitmap will be calculate when memtable flush and
+ // publish. The two stages may see different versions.
+ // When there is sequence column, the currently imported data
+ // of rowset may be marked for deletion at memtablet flush or
+ // publish because the seq column is smaller than the previous
+ // rowset.
+ // just set 0 as a unified temporary version number, and update to
+ // the real version number later.
delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
+ delete_bitmap->add({rowset_id, seg->id(), 0}, row_id);
+ continue;
}
+ // when st = ok
+ delete_bitmap->add({loc.rowset_id, loc.segment_id, 0}, loc.row_id);
++row_id;
}
remaining -= num_read;
@@ -2959,18 +2923,19 @@ Status Tablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}
LOG(INFO) << "calc segment delete bitmap, tablet: " << tablet_id() << " rowset: " << rowset_id
<< " seg_id: " << seg->id() << " dummy_version: " << end_version + 1
- << " rows: " << seg->num_rows() << " cost: " << watch.get_elapse_time_us() << "(us)";
+ << " rows: " << seg->num_rows()
+ << " bimap num: " << delete_bitmap->delete_bitmap.size()
+ << " cost: " << watch.get_elapse_time_us() << "(us)";
return Status::OK();
}
-// caller should hold meta_lock
Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
const std::vector<segment_v2::SegmentSharedPtr>& segments,
- const RowsetIdUnorderedSet* specified_rowset_ids,
+ const std::vector<RowsetSharedPtr>& specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t end_version,
RowsetWriter* rowset_writer) {
auto rowset_id = rowset->rowset_id();
- if (specified_rowset_ids == nullptr || specified_rowset_ids->empty() || segments.empty()) {
+ if (specified_rowsets.empty() || segments.empty()) {
LOG(INFO) << "skip to construct delete bitmap tablet: " << tablet_id()
<< " rowset: " << rowset_id;
return Status::OK();
@@ -2987,8 +2952,8 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
DeleteBitmapPtr seg_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
seg_delete_bitmaps.push_back(seg_delete_bitmap);
RETURN_IF_ERROR(token->submit_func([=, &calc_status, this]() {
- auto st = calc_segment_delete_bitmap(rowset, seg, specified_rowset_ids,
- seg_delete_bitmap, end_version, rowset_writer);
+ auto st = calc_segment_delete_bitmap(rowset, seg, specified_rowsets, seg_delete_bitmap,
+ end_version, rowset_writer);
if (!st.ok()) {
LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: " << tablet_id()
<< " rowset: " << rowset_id << " seg_id: " << seg->id()
@@ -2999,7 +2964,7 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
}
// this thread calc delete bitmap of segment 0
- RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], specified_rowset_ids,
+ RETURN_IF_ERROR(calc_segment_delete_bitmap(rowset, segments[0], specified_rowsets,
delete_bitmap, end_version, rowset_writer));
token->wait();
auto code = calc_status.load();
@@ -3012,6 +2977,21 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
return Status::OK();
}
+std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
+ const RowsetIdUnorderedSet* specified_rowset_ids) {
+ std::vector<RowsetSharedPtr> rowsets;
+ for (auto& rs : _rs_version_map) {
+ if (!specified_rowset_ids ||
+ specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) {
+ rowsets.push_back(rs.second);
+ }
+ }
+ std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) {
+ return lhs->end_version() > rhs->end_version();
+ });
+ return rowsets;
+}
+
Status Tablet::generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateReadPlan& read_plan_ori,
const PartialUpdateReadPlan& read_plan_update,
@@ -3166,9 +3146,10 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
RETURN_IF_ERROR(calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));
+ std::vector<RowsetSharedPtr> specified_rowsets = get_rowset_by_ids(&cur_rowset_ids);
OlapStopWatch watch;
- RETURN_IF_ERROR(
- calc_delete_bitmap(rowset, segments, &cur_rowset_ids, delete_bitmap, cur_version - 1));
+ RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
+ cur_version - 1));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
@@ -3176,7 +3157,6 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
<< ", rowset_ids: " << cur_rowset_ids.size() << ", cur max_version: " << cur_version
<< ", transaction_id: " << -1 << ", cost: " << watch.get_elapse_time_us()
<< "(us), total rows: " << total_rows;
-
for (auto iter = delete_bitmap->delete_bitmap.begin();
iter != delete_bitmap->delete_bitmap.end(); ++iter) {
_tablet_meta->delete_bitmap().merge(
@@ -3187,23 +3167,33 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
}
Status Tablet::commit_phase_update_delete_bitmap(
- const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
- const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
- RowsetWriter* rowset_writer) {
+ const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
+ DeleteBitmapPtr delete_bitmap, const std::vector<segment_v2::SegmentSharedPtr>& segments,
+ int64_t txn_id, RowsetWriter* rowset_writer) {
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
+ int64_t cur_version;
- std::shared_lock meta_rlock(_meta_lock);
- cur_rowset_ids = all_rs_id(cur_version);
- _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock meta_rlock(_meta_lock);
+ cur_version = max_version().second;
+ cur_rowset_ids = all_rs_id(cur_version);
+ _rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add,
+ &rowset_ids_to_del);
+ if (!rowset_ids_to_add.empty() || !rowset_ids_to_del.empty()) {
+ LOG(INFO) << "rowset_ids_to_add: " << rowset_ids_to_add.size()
+ << ", rowset_ids_to_del: " << rowset_ids_to_del.size();
+ }
+ specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
+ }
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
OlapStopWatch watch;
- RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap,
+ RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version, rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
@@ -3213,6 +3203,7 @@ Status Tablet::commit_phase_update_delete_bitmap(
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
<< ", cur max_version: " << cur_version << ", transaction_id: " << txn_id
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;
+ pre_rowset_ids = cur_rowset_ids;
return Status::OK();
}
@@ -3229,22 +3220,30 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset,
_load_rowset_segments(rowset, &segments);
std::lock_guard<std::mutex> rwlock(_rowset_update_lock);
- std::shared_lock meta_rlock(_meta_lock);
- // tablet is under alter process. The delete bitmap will be calculated after conversion.
- if (tablet_state() == TABLET_NOTREADY &&
- SchemaChangeHandler::tablet_in_converting(tablet_id())) {
- LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
- << tablet_id();
- return Status::OK();
+ {
+ std::shared_lock meta_rlock(_meta_lock);
+ // tablet is under alter process. The delete bitmap will be calculated after conversion.
+ if (tablet_state() == TABLET_NOTREADY &&
+ SchemaChangeHandler::tablet_in_converting(tablet_id())) {
+ LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
+ << tablet_id();
+ return Status::OK();
+ }
+ cur_rowset_ids = all_rs_id(cur_version - 1);
}
- cur_rowset_ids = all_rs_id(cur_version - 1);
_rowset_ids_difference(cur_rowset_ids, pre_rowset_ids, &rowset_ids_to_add, &rowset_ids_to_del);
for (const auto& to_del : rowset_ids_to_del) {
delete_bitmap->remove({to_del, 0, 0}, {to_del, UINT32_MAX, INT64_MAX});
}
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock meta_rlock(_meta_lock);
+ specified_rowsets = get_rowset_by_ids(&rowset_ids_to_add);
+ }
+
OlapStopWatch watch;
- RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, &rowset_ids_to_add, delete_bitmap,
+ RETURN_IF_ERROR(calc_delete_bitmap(rowset, segments, specified_rowsets, delete_bitmap,
cur_version - 1, rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 3c3a010397..7cccb03a31 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -47,7 +47,6 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
-#include "olap/rowset/rowset_tree.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_meta.h"
#include "olap/tablet_schema.h"
@@ -401,8 +400,9 @@ public:
// NOTE: the method only works in unique key model with primary key index, you will got a
// not supported error in other data model.
Status lookup_row_key(
- const Slice& encoded_key, bool with_seq_col, const RowsetIdUnorderedSet* rowset_ids,
- RowLocation* row_location, uint32_t version,
+ const Slice& encoded_key, bool with_seq_col,
+ const std::vector<RowsetSharedPtr>& specified_rowsets, RowLocation* row_location,
+ uint32_t version,
std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId>& segment_caches,
RowsetSharedPtr* rowset = nullptr);
@@ -429,14 +429,19 @@ public:
// and build newly generated rowset's delete_bitmap
Status calc_delete_bitmap(RowsetSharedPtr rowset,
const std::vector<segment_v2::SegmentSharedPtr>& segments,
- const RowsetIdUnorderedSet* specified_rowset_ids,
+ const std::vector<RowsetSharedPtr>& specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t version,
RowsetWriter* rowset_writer = nullptr);
+
+ std::vector<RowsetSharedPtr> get_rowset_by_ids(
+ const RowsetIdUnorderedSet* specified_rowset_ids);
+
Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
const segment_v2::SegmentSharedPtr& seg,
- const RowsetIdUnorderedSet* specified_rowset_ids,
+ const std::vector<RowsetSharedPtr>& specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t end_version,
RowsetWriter* rowset_writer);
+
Status calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
DeleteBitmapPtr delete_bitmap);
@@ -456,8 +461,8 @@ public:
Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset);
Status commit_phase_update_delete_bitmap(
- const RowsetSharedPtr& rowset, const RowsetIdUnorderedSet& pre_rowset_ids,
- DeleteBitmapPtr delete_bitmap, const int64_t& cur_version,
+ const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
+ DeleteBitmapPtr delete_bitmap,
const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
RowsetWriter* rowset_writer = nullptr);
@@ -605,9 +610,6 @@ private:
// These _stale rowsets are been removed when rowsets' pathVersion is expired,
// this policy is judged and computed by TimestampedVersionTracker.
std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
- // RowsetTree is used to locate rowsets containing a key or a key range quickly.
- // It's only used in UNIQUE_KEYS data model.
- std::unique_ptr<RowsetTree> _rowset_tree;
// if this tablet is broken, set to true. default is false
std::atomic<bool> _is_bad;
// timestamp of last cumu compaction failure
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 4f08e50a79..9641e6e61a 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -942,6 +942,7 @@ bool DeleteBitmap::contains_agg(const BitmapKey& bmk, uint32_t row_id) const {
}
bool DeleteBitmap::contains_agg_without_cache(const BitmapKey& bmk, uint32_t row_id) const {
+ std::shared_lock l(lock);
DeleteBitmap::BitmapKey start {std::get<0>(bmk), std::get<1>(bmk), 0};
for (auto it = delete_bitmap.lower_bound(start); it != delete_bitmap.end(); ++it) {
auto& [k, bm] = *it;
diff --git a/be/src/service/point_query_executor.cpp b/be/src/service/point_query_executor.cpp
index 0e8acf62fe..255d4c9a2c 100644
--- a/be/src/service/point_query_executor.cpp
+++ b/be/src/service/point_query_executor.cpp
@@ -246,6 +246,11 @@ Status PointQueryExecutor::_lookup_row_key() {
// 2. lookup row location
Status st;
std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches;
+ std::vector<RowsetSharedPtr> specified_rowsets;
+ {
+ std::shared_lock rlock(_tablet->get_header_lock());
+ specified_rowsets = _tablet->get_rowset_by_ids(nullptr);
+ }
for (size_t i = 0; i < _row_read_ctxs.size(); ++i) {
RowLocation location;
if (!config::disable_storage_row_cache) {
@@ -260,8 +265,9 @@ Status PointQueryExecutor::_lookup_row_key() {
}
// Get rowlocation and rowset, ctx._rowset_ptr will acquire wrap this ptr
auto rowset_ptr = std::make_unique<RowsetSharedPtr>();
- st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, true, nullptr, &location,
- INT32_MAX /*rethink?*/, segment_caches, rowset_ptr.get()));
+ st = (_tablet->lookup_row_key(_row_read_ctxs[i]._primary_key, true, specified_rowsets,
+ &location, INT32_MAX /*rethink?*/, segment_caches,
+ rowset_ptr.get()));
if (st.is_not_found()) {
continue;
}
diff --git a/be/test/olap/tablet_test.cpp b/be/test/olap/tablet_test.cpp
index f2ee6b2d96..3ae55f9743 100644
--- a/be/test/olap/tablet_test.cpp
+++ b/be/test/olap/tablet_test.cpp
@@ -392,66 +392,4 @@ TEST_F(TestTablet, cooldown_policy) {
}
}
-TEST_F(TestTablet, rowset_tree_update) {
- TTabletSchema tschema;
- tschema.keys_type = TKeysType::UNIQUE_KEYS;
- TabletMetaSharedPtr tablet_meta = new_tablet_meta(tschema, true);
- TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
- RowsetIdUnorderedSet rowset_ids;
- tablet->init();
-
- RowsetMetaSharedPtr rsm1(new RowsetMeta());
- init_rs_meta(rsm1, 6, 7, convert_key_bounds({{"100", "200"}, {"300", "400"}}));
- RowsetId id1;
- id1.init(10010);
- RowsetSharedPtr rs_ptr1;
- MockRowset::create_rowset(tablet->tablet_schema(), "", rsm1, &rs_ptr1, false);
- tablet->add_inc_rowset(rs_ptr1);
- rowset_ids.insert(id1);
-
- RowsetMetaSharedPtr rsm2(new RowsetMeta());
- init_rs_meta(rsm2, 8, 8, convert_key_bounds({{"500", "999"}}));
- RowsetId id2;
- id2.init(10086);
- rsm2->set_rowset_id(id2);
- RowsetSharedPtr rs_ptr2;
- MockRowset::create_rowset(tablet->tablet_schema(), "", rsm2, &rs_ptr2, false);
- tablet->add_inc_rowset(rs_ptr2);
- rowset_ids.insert(id2);
-
- RowsetId id3;
- id3.init(540081);
- rowset_ids.insert(id3);
-
- std::unordered_map<RowsetId, SegmentCacheHandle, HashOfRowsetId> segment_caches;
- RowLocation loc;
- // Key not in range.
- ASSERT_TRUE(tablet->lookup_row_key("99", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::NOT_FOUND>());
- // Version too low.
- ASSERT_TRUE(tablet->lookup_row_key("101", true, &rowset_ids, &loc, 3, segment_caches)
- .is<ErrorCode::NOT_FOUND>());
- // Hit a segment, but since we don't have real data, return an internal error when loading the
- // segment.
- LOG(INFO) << tablet->lookup_row_key("101", true, &rowset_ids, &loc, 7, segment_caches)
- .to_string();
- ASSERT_TRUE(tablet->lookup_row_key("101", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::IO_ERROR>());
- // Key not in range.
- ASSERT_TRUE(tablet->lookup_row_key("201", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::NOT_FOUND>());
- ASSERT_TRUE(tablet->lookup_row_key("300", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::IO_ERROR>());
- // Key not in range.
- ASSERT_TRUE(tablet->lookup_row_key("499", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::NOT_FOUND>());
- // Version too low.
- ASSERT_TRUE(tablet->lookup_row_key("500", true, &rowset_ids, &loc, 7, segment_caches)
- .is<ErrorCode::NOT_FOUND>());
- // Hit a segment, but since we don't have real data, return an internal error when loading the
- // segment.
- ASSERT_TRUE(tablet->lookup_row_key("500", true, &rowset_ids, &loc, 8, segment_caches)
- .is<ErrorCode::IO_ERROR>());
-}
-
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org