You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/06 15:15:31 UTC
[doris] 29/36: [fix](load) fix generate delete bitmap in memtable flush (#20446)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 6fefe31e48ba0538ff57961f9831e66916f979e7
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Tue Jun 6 09:48:30 2023 +0800
[fix](load) fix generate delete bitmap in memtable flush (#20446)
1. Generate delete bitmap for one segment at a time.
2. Generate delete bitmap before segment compaction.
Fix #20445
---
be/src/olap/memtable.cpp | 20 +++++++-------------
be/src/olap/memtable.h | 3 +--
be/src/olap/rowset/beta_rowset_writer.cpp | 2 ++
be/src/olap/rowset/beta_rowset_writer.h | 2 --
be/src/olap/rowset/rowset_writer.h | 6 ++++--
5 files changed, 14 insertions(+), 19 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index d61a25c8be..9a6e5687d2 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -443,21 +443,16 @@ bool MemTable::need_agg() const {
return false;
}
-Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
- int64_t atomic_num_segments_after_flush) {
+Status MemTable::_generate_delete_bitmap(int32_t segment_id) {
SCOPED_RAW_TIMER(&_stat.delete_bitmap_ns);
// generate delete bitmap, build a tmp rowset and load recent segment
if (!_tablet->enable_unique_key_merge_on_write()) {
return Status::OK();
}
- if (atomic_num_segments_before_flush >= atomic_num_segments_after_flush) {
- return Status::OK();
- }
auto rowset = _rowset_writer->build_tmp();
auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
std::vector<segment_v2::SegmentSharedPtr> segments;
- RETURN_IF_ERROR(beta_rowset->load_segments(atomic_num_segments_before_flush,
- atomic_num_segments_after_flush, &segments));
+ RETURN_IF_ERROR(beta_rowset->load_segments(segment_id, segment_id + 1, &segments));
std::shared_lock meta_rlock(_tablet->get_header_lock());
// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (_tablet->tablet_state() == TABLET_NOTREADY &&
@@ -477,15 +472,9 @@ Status MemTable::flush() {
// The id of new segment is set by the _num_segment of beta_rowset_writer,
// and new segment ids is between [atomic_num_segments_before_flush, atomic_num_segments_after_flush),
// and use the ids to load segment data file for calc delete bitmap.
- int64_t atomic_num_segments_before_flush = _rowset_writer->get_atomic_num_segment();
int64_t duration_ns;
SCOPED_RAW_TIMER(&duration_ns);
SKIP_MEMORY_CHECK(RETURN_IF_ERROR(_do_flush()));
- int64_t atomic_num_segments_after_flush = _rowset_writer->get_atomic_num_segment();
- if (!_tablet_schema->is_partial_update()) {
- RETURN_IF_ERROR(_generate_delete_bitmap(atomic_num_segments_before_flush,
- atomic_num_segments_after_flush));
- }
_delta_writer_callback(_stat);
DorisMetrics::instance()->memtable_flush_total->increment(1);
DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
@@ -515,6 +504,11 @@ Status MemTable::_do_flush() {
// Unfold variant column
RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
}
+ if (!_tablet_schema->is_partial_update()) {
+ ctx.generate_delete_bitmap = [this](size_t segment_id) {
+ return _generate_delete_bitmap(segment_id);
+ };
+ }
ctx.segment_id = _segment_id;
SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx));
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index e7844f5d93..912f2f4211 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -216,8 +216,7 @@ private:
void _aggregate_two_row_in_block(vectorized::MutableBlock& mutable_block, RowInBlock* new_row,
RowInBlock* row_in_skiplist);
- Status _generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
- int64_t atomic_num_segments_after_flush);
+ Status _generate_delete_bitmap(int32_t segment_id);
// serialize block to row store format and append serialized data into row store column
// in block
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index e75075d27f..c729a65077 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -491,8 +491,10 @@ Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block, i
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(&writer, ctx));
+ int32_t segment_id = writer->get_segment_id();
RETURN_IF_ERROR(_add_block(block, &writer));
RETURN_IF_ERROR(_flush_segment_writer(&writer, flush_size));
+ RETURN_IF_ERROR(ctx->generate_delete_bitmap(segment_id));
RETURN_IF_ERROR(_segcompaction_if_necessary());
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 88219e5941..b646f2a681 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -106,8 +106,6 @@ public:
return Status::OK();
}
- int32_t get_atomic_num_segment() const override { return _num_segment.load(); }
-
int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
// Maybe modified by local schema change
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 0ae5fd46c5..3ccd481ec7 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -20,6 +20,7 @@
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/types.pb.h>
+#include <functional>
#include <optional>
#include "common/factory_creator.h"
@@ -40,6 +41,9 @@ struct FlushContext {
TabletSchemaSPtr flush_schema = nullptr;
const vectorized::Block* block = nullptr;
std::optional<int32_t> segment_id = std::nullopt;
+ std::function<Status(int32_t)> generate_delete_bitmap = [](int32_t segment_id) {
+ return Status::OK();
+ };
};
class RowsetWriter {
@@ -100,8 +104,6 @@ public:
return Status::NotSupported("to be implemented");
}
- virtual int32_t get_atomic_num_segment() const = 0;
-
virtual int32_t allocate_segment_id() = 0;
virtual bool is_doing_segcompaction() const = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org