You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/19 05:10:35 UTC

[doris] 01/21: [fix](merge-on-write) calc delete bitmap need all segments which _do_flush in one memtable (#15018)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b027ae3490627ed6bb780ee1a7d064eda23ec4b3
Author: xueweizhang <zx...@163.com>
AuthorDate: Thu Dec 15 20:44:49 2022 +0800

    [fix](merge-on-write) calc delete bitmap need all segments which _do_flush in one memtable (#15018)
    
    when some case(need modify be.conf), a memtable may flush many segments and then calc delete bitmap with new data. but now, it just only load one segment with max sgement id and this bug will not cala delte bitmap with all data of all segment of one memtable, and will get many rows with same key from merge-on-write table.
---
 be/src/olap/memtable.cpp                | 19 ++++++++++++------
 be/src/olap/memtable.h                  |  3 ++-
 be/src/olap/rowset/beta_rowset.cpp      | 35 ++++++++++++++++++++-------------
 be/src/olap/rowset/beta_rowset.h        |  3 ++-
 be/src/olap/rowset/beta_rowset_writer.h |  2 ++
 be/src/olap/rowset/rowset_writer.h      |  2 ++
 6 files changed, 42 insertions(+), 22 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 892592b797..ab1a0f5209 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -415,7 +415,8 @@ bool MemTable::need_to_agg() {
                                              : memory_usage() >= config::memtable_max_buffer_size;
 }
 
-Status MemTable::_generate_delete_bitmap() {
+Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
+                                         int64_t atomic_num_segments_after_flush) {
     // generate delete bitmap, build a tmp rowset and load recent segment
     if (!_tablet->enable_unique_key_merge_on_write()) {
         return Status::OK();
@@ -423,12 +424,11 @@ Status MemTable::_generate_delete_bitmap() {
     auto rowset = _rowset_writer->build_tmp();
     auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
     std::vector<segment_v2::SegmentSharedPtr> segments;
-    segment_v2::SegmentSharedPtr segment;
-    if (beta_rowset->num_segments() == 0) {
+    if (atomic_num_segments_before_flush >= atomic_num_segments_after_flush) {
         return Status::OK();
     }
-    RETURN_IF_ERROR(beta_rowset->load_segment(beta_rowset->num_segments() - 1, &segment));
-    segments.push_back(segment);
+    RETURN_IF_ERROR(beta_rowset->load_segments(atomic_num_segments_before_flush,
+                                               atomic_num_segments_after_flush, &segments));
     std::shared_lock meta_rlock(_tablet->get_header_lock());
     // tablet is under alter process. The delete bitmap will be calculated after conversion.
     if (_tablet->tablet_state() == TABLET_NOTREADY &&
@@ -445,8 +445,15 @@ Status MemTable::flush() {
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << tablet_id()
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
+    // For merge_on_write table, it must get all segments in this flush.
+    // The id of new segment is set by the _num_segment of beta_rowset_writer,
+    // and new segment ids is between [atomic_num_segments_before_flush, atomic_num_segments_after_flush),
+    // and use the ids to load segment data file for calc delete bitmap.
+    int64_t atomic_num_segments_before_flush = _rowset_writer->get_atomic_num_segment();
     RETURN_NOT_OK(_do_flush(duration_ns));
-    RETURN_NOT_OK(_generate_delete_bitmap());
+    int64_t atomic_num_segments_after_flush = _rowset_writer->get_atomic_num_segment();
+    RETURN_NOT_OK(_generate_delete_bitmap(atomic_num_segments_before_flush,
+                                          atomic_num_segments_after_flush));
     DorisMetrics::instance()->memtable_flush_total->increment(1);
     DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000);
     VLOG_CRITICAL << "after flush memtable for tablet: " << tablet_id()
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 52589f35cc..e7f59ff151 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -146,7 +146,8 @@ private:
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
 
-    Status _generate_delete_bitmap();
+    Status _generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
+                                   int64_t atomic_num_segments_after_flush);
 
 private:
     TabletSharedPtr _tablet;
diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp
index 8ed48fdf70..f04fbf5922 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -130,20 +130,27 @@ Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segm
     return Status::OK();
 }
 
-Status BetaRowset::load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment) {
-    DCHECK(seg_id >= 0);
-    auto fs = _rowset_meta->fs();
-    if (!fs || _schema == nullptr) {
-        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
-    }
-    auto seg_path = segment_file_path(seg_id);
-    auto cache_path = segment_cache_path(seg_id);
-    auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), _schema,
-                                       segment);
-    if (!s.ok()) {
-        LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset " << unique_id()
-                     << " : " << s.to_string();
-        return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+Status BetaRowset::load_segments(int64_t seg_id_begin, int64_t seg_id_end,
+                                 std::vector<segment_v2::SegmentSharedPtr>* segments) {
+    int64_t seg_id = seg_id_begin;
+    while (seg_id < seg_id_end) {
+        DCHECK(seg_id >= 0);
+        auto fs = _rowset_meta->fs();
+        if (!fs || _schema == nullptr) {
+            return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+        }
+        auto seg_path = segment_file_path(seg_id);
+        auto cache_path = segment_cache_path(seg_id);
+        std::shared_ptr<segment_v2::Segment> segment;
+        auto s = segment_v2::Segment::open(fs, seg_path, cache_path, seg_id, rowset_id(), _schema,
+                                           &segment);
+        if (!s.ok()) {
+            LOG(WARNING) << "failed to open segment. " << seg_path << " under rowset "
+                         << unique_id() << " : " << s.to_string();
+            return Status::OLAPInternalError(OLAP_ERR_ROWSET_LOAD_FAILED);
+        }
+        segments->push_back(std::move(segment));
+        seg_id++;
     }
     return Status::OK();
 }
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 7cd792bf69..1087fdcd18 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -87,7 +87,8 @@ public:
 
     Status load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments);
 
-    Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
+    Status load_segments(int64_t seg_id_begin, int64_t seg_id_end,
+                         std::vector<segment_v2::SegmentSharedPtr>* segments);
 
 protected:
     BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 65662ecf1a..5f4077d4b5 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -80,6 +80,8 @@ public:
 
     void compact_segments(SegCompactionCandidatesSharedPtr segments);
 
+    int32_t get_atomic_num_segment() const override { return _num_segment.load(); }
+
 private:
     template <typename RowType>
     Status _add_row(const RowType& row);
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 2713b3c60c..b56262643e 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -84,6 +84,8 @@ public:
         return Status::NotSupported("to be implemented");
     }
 
+    virtual int32_t get_atomic_num_segment() const = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org