You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/02 05:50:02 UTC

[doris] branch master updated: [performance](load) support parallel memtable flush for unique key tables (#20308)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a869056567 [performance](load) support parallel memtable flush for unique key tables (#20308)
a869056567 is described below

commit a869056567f5363d2cbbb9bdd8e8007fcd4138b6
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Fri Jun 2 13:49:53 2023 +0800

    [performance](load) support parallel memtable flush for unique key tables (#20308)
---
 be/src/olap/delta_writer.cpp                       | 10 ++--
 be/src/olap/memtable.cpp                           | 11 ++--
 be/src/olap/memtable.h                             |  5 ++
 be/src/olap/rowset/beta_rowset_writer.cpp          | 59 ++++++++++++++++------
 be/src/olap/rowset/beta_rowset_writer.h            | 14 ++++-
 be/src/olap/rowset/rowset_writer.h                 |  5 ++
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp |  5 ++
 7 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 4a7a5760a1..8386847894 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -209,9 +209,9 @@ Status DeltaWriter::init() {
     _reset_mem_table();
 
     // create flush handler
-    // unique key should flush serial because we need to make sure same key should sort
-    // in the same order in all replica.
-    bool should_serial = _tablet->keys_type() == KeysType::UNIQUE_KEYS;
+    // by assigning segment_id to memtable before submiting to flush executor,
+    // we can make sure same keys sort in the same order in all replicas.
+    bool should_serial = false;
     RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
             &_flush_token, _rowset_writer->type(), should_serial, _req.is_high_priority));
 
@@ -267,6 +267,10 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
 }
 
 Status DeltaWriter::_flush_memtable_async() {
+    if (_mem_table->empty()) {
+        return Status::OK();
+    }
+    _mem_table->assign_segment_id();
     return _flush_token->submit(std::move(_mem_table));
 }
 
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c398a22b15..cd4db46d20 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -418,12 +418,12 @@ Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flus
     if (!_tablet->enable_unique_key_merge_on_write()) {
         return Status::OK();
     }
-    auto rowset = _rowset_writer->build_tmp();
-    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
-    std::vector<segment_v2::SegmentSharedPtr> segments;
     if (atomic_num_segments_before_flush >= atomic_num_segments_after_flush) {
         return Status::OK();
     }
+    auto rowset = _rowset_writer->build_tmp();
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+    std::vector<segment_v2::SegmentSharedPtr> segments;
     RETURN_IF_ERROR(beta_rowset->load_segments(atomic_num_segments_before_flush,
                                                atomic_num_segments_after_flush, &segments));
     std::shared_lock meta_rlock(_tablet->get_header_lock());
@@ -483,11 +483,16 @@ Status MemTable::_do_flush() {
         // Unfold variant column
         RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
     }
+    ctx.segment_id = _segment_id;
     SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
     RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx));
     return Status::OK();
 }
 
+void MemTable::assign_segment_id() {
+    _segment_id = std::optional<int32_t> {_rowset_writer->allocate_segment_id()};
+}
+
 Status MemTable::close() {
     return flush();
 }
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 95d41df62d..d6acc5211c 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -22,6 +22,7 @@
 
 #include <functional>
 #include <memory>
+#include <optional>
 #include <ostream>
 #include <vector>
 
@@ -145,6 +146,9 @@ public:
         _delta_writer_callback = callback;
     }
 
+    bool empty() const { return _input_mutable_block.rows() == 0; }
+    void assign_segment_id();
+
 private:
     Status _do_flush();
 
@@ -225,6 +229,7 @@ private:
     void _put_into_output(vectorized::Block& in_block);
     bool _is_first_insertion;
     std::function<void(MemTableStat&)> _delta_writer_callback;
+    std::optional<int32_t> _segment_id = std::nullopt;
 
     void _init_agg_functions(const vectorized::Block* block);
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5358732daa..e75075d27f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -60,8 +60,8 @@ using namespace ErrorCode;
 
 BetaRowsetWriter::BetaRowsetWriter()
         : _rowset_meta(nullptr),
+          _next_segment_id(0),
           _num_segment(0),
-          _num_flushed_segment(0),
           _segment_start_id(0),
           _segcompacted_point(0),
           _num_segcompacted(0),
@@ -89,9 +89,10 @@ BetaRowsetWriter::~BetaRowsetWriter() {
         if (!fs) {
             return;
         }
-        for (int i = 0; i < _num_segment; ++i) {
-            std::string seg_path =
-                    BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i);
+        auto max_segment_id = std::max(_num_segment.load(), _next_segment_id.load());
+        for (int i = 0; i < max_segment_id; ++i) {
+            std::string seg_path = BetaRowset::segment_file_path(
+                    _context.rowset_dir, _context.rowset_id, _segment_start_id + i);
             // Even if an error is encountered, these files that have not been cleaned up
             // will be cleaned up by the GC background. So here we only print the error
             // message when we encounter an error.
@@ -357,8 +358,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
     if (_segcompaction_status.load() != OK) {
         status = Status::Error<SEGCOMPACTION_FAILED>();
     } else if ((_num_segment - _segcompacted_point) >=
-                       config::segcompaction_threshold_segment_num &&
-               _num_segment == _num_flushed_segment) /* no segment in the middle is flushing */ {
+               config::segcompaction_threshold_segment_num) {
         SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
         status = _get_segcompaction_candidates(segments, false);
         if (LIKELY(status.ok()) && (segments->size() > 0)) {
@@ -408,6 +408,17 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
     return status;
 }
 
+Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
+                                       std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
+                                       size_t row_offset, size_t input_row_num) {
+    auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
+    if (UNLIKELY(!s.ok())) {
+        LOG(WARNING) << "failed to append block: " << s.to_string();
+        return Status::Error<WRITER_DATA_WRITE_ERROR>();
+    }
+    return Status::OK();
+}
+
 Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
                                     std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
                                     const FlushContext* flush_ctx) {
@@ -416,6 +427,13 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
     size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
     size_t row_offset = 0;
 
+    if (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) {
+        // the entire block (memtable) should be flushed into single segment
+        RETURN_IF_ERROR(_do_add_block(block, segment_writer, 0, block_row_num));
+        _raw_num_rows_written += block_row_num;
+        return Status::OK();
+    }
+
     do {
         auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
         if (UNLIKELY(max_row_add < 1)) {
@@ -426,11 +444,7 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
             DCHECK(max_row_add > 0);
         }
         size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
-        auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
-        if (UNLIKELY(!s.ok())) {
-            LOG(WARNING) << "failed to append block: " << s.to_string();
-            return Status::Error<WRITER_DATA_WRITE_ERROR>();
-        }
+        RETURN_IF_ERROR(_do_add_block(block, segment_writer, row_offset, input_row_num));
         row_offset += input_row_num;
     } while (row_offset < block_row_num);
 
@@ -445,6 +459,9 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
     _total_data_size += rowset->rowset_meta()->data_disk_size();
     _total_index_size += rowset->rowset_meta()->index_disk_size();
     _num_segment += rowset->num_segments();
+    // _next_segment_id is not used in this code path,
+    // just to make sure it matches with _num_segment
+    _next_segment_id = _num_segment.load();
     // append key_bounds to current rowset
     rowset->get_segments_key_bounds(&_segments_encoded_key_bounds);
     // TODO update zonemap
@@ -515,6 +532,8 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r
 }
 
 RowsetSharedPtr BetaRowsetWriter::build() {
+    // make sure all segments are flushed
+    DCHECK_EQ(_num_segment, _next_segment_id);
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& file_writer : _file_writers) {
         Status status = file_writer->close();
@@ -679,7 +698,10 @@ Status BetaRowsetWriter::_do_create_segment_writer(
         path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, _context.rowset_id,
                                                            begin, end);
     } else {
-        segment_id = _num_segment.fetch_add(1) + _segment_start_id;
+        int32_t segid_offset = (flush_ctx != nullptr && flush_ctx->segment_id.has_value())
+                                       ? flush_ctx->segment_id.value()
+                                       : allocate_segment_id();
+        segment_id = segid_offset + _segment_start_id;
         path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id);
     }
     auto fs = _rowset_meta->fs();
@@ -751,6 +773,7 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
                                                int64_t* flush_size) {
     uint32_t segid = (*writer)->get_segment_id();
     uint32_t row_num = (*writer)->num_rows_written();
+    uint32_t segid_offset = segid - _segment_start_id;
 
     if ((*writer)->num_rows_written() == 0) {
         return Status::OK();
@@ -782,8 +805,8 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
         std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
         CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
         _segid_statistics_map.emplace(segid, segstat);
-        _segment_num_rows.resize(_num_segment);
-        _segment_num_rows[_num_segment - 1] = row_num;
+        _segment_num_rows.resize(_next_segment_id);
+        _segment_num_rows[segid_offset] = row_num;
     }
     VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
                << " data_size:" << segment_size << " index_size:" << index_size;
@@ -792,7 +815,13 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
     if (flush_size) {
         *flush_size = segment_size + index_size;
     }
-    _num_flushed_segment.fetch_add(1);
+    {
+        std::lock_guard<std::mutex> lock(_segment_set_mutex);
+        _segment_set.add(segid_offset);
+        while (_segment_set.contains(_num_segment)) {
+            _num_segment++;
+        }
+    }
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 389c1bbede..88219e5941 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -28,6 +28,8 @@
 #include <map>
 #include <memory>
 #include <mutex>
+#include <optional>
+#include <roaring/roaring.hh>
 #include <string>
 #include <unordered_set>
 #include <vector>
@@ -106,6 +108,8 @@ public:
 
     int32_t get_atomic_num_segment() const override { return _num_segment.load(); }
 
+    int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
+
     // Maybe modified by local schema change
     vectorized::schema_util::LocalSchemaChangeRecorder* mutable_schema_change_recorder() override {
         return _context.schema_change_recorder.get();
@@ -122,6 +126,9 @@ public:
     Status wait_flying_segcompaction() override;
 
 private:
+    Status _do_add_block(const vectorized::Block* block,
+                         std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
+                         size_t row_offset, size_t input_row_num);
     Status _add_block(const vectorized::Block* block,
                       std::unique_ptr<segment_v2::SegmentWriter>* writer,
                       const FlushContext* flush_ctx = nullptr);
@@ -158,8 +165,11 @@ protected:
     RowsetWriterContext _context;
     std::shared_ptr<RowsetMeta> _rowset_meta;
 
-    std::atomic<int32_t> _num_segment;
-    std::atomic<int32_t> _num_flushed_segment;
+    std::atomic<int32_t> _next_segment_id; // the next available segment_id (offset),
+                                           // also the numer of allocated segments
+    std::atomic<int32_t> _num_segment;     // number of consecutive flushed segments
+    roaring::Roaring _segment_set;         // bitmap set to record flushed segment id
+    std::mutex _segment_set_mutex;         // mutex for _segment_set
     int32_t _segment_start_id; //basic write start from 0, partial update may be different
     std::atomic<int32_t> _segcompacted_point; // segemnts before this point have
                                               // already been segment compacted
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 6bfa9f6e9a..0ae5fd46c5 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -20,6 +20,8 @@
 #include <gen_cpp/olap_file.pb.h>
 #include <gen_cpp/types.pb.h>
 
+#include <optional>
+
 #include "common/factory_creator.h"
 #include "gutil/macros.h"
 #include "olap/column_mapping.h"
@@ -37,6 +39,7 @@ struct FlushContext {
     ENABLE_FACTORY_CREATOR(FlushContext);
     TabletSchemaSPtr flush_schema = nullptr;
     const vectorized::Block* block = nullptr;
+    std::optional<int32_t> segment_id = std::nullopt;
 };
 
 class RowsetWriter {
@@ -99,6 +102,8 @@ public:
 
     virtual int32_t get_atomic_num_segment() const = 0;
 
+    virtual int32_t allocate_segment_id() = 0;
+
     virtual bool is_doing_segcompaction() const = 0;
 
     virtual Status wait_flying_segcompaction() = 0;
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 9df66651b0..bf003a713c 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -155,6 +155,11 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) {
 Status VerticalBetaRowsetWriter::_create_segment_writer(
         const std::vector<uint32_t>& column_ids, bool is_key,
         std::unique_ptr<segment_v2::SegmentWriter>* writer) {
+    // TODO: just for pass DCHECK now, we should align the meaning
+    // of _num_segment and _next_segment_id with BetaRowsetWriter.
+    // i.e. _next_segment_id means next available segment id,
+    // and _num_segment means num of flushed segments.
+    allocate_segment_id();
     auto path =
             BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++);
     auto fs = _rowset_meta->fs();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org