You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/06/02 05:50:02 UTC
[doris] branch master updated: [performance](load) support parallel memtable flush for unique key tables (#20308)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a869056567 [performance](load) support parallel memtable flush for unique key tables (#20308)
a869056567 is described below
commit a869056567f5363d2cbbb9bdd8e8007fcd4138b6
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Fri Jun 2 13:49:53 2023 +0800
[performance](load) support parallel memtable flush for unique key tables (#20308)
---
be/src/olap/delta_writer.cpp | 10 ++--
be/src/olap/memtable.cpp | 11 ++--
be/src/olap/memtable.h | 5 ++
be/src/olap/rowset/beta_rowset_writer.cpp | 59 ++++++++++++++++------
be/src/olap/rowset/beta_rowset_writer.h | 14 ++++-
be/src/olap/rowset/rowset_writer.h | 5 ++
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 5 ++
7 files changed, 86 insertions(+), 23 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 4a7a5760a1..8386847894 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -209,9 +209,9 @@ Status DeltaWriter::init() {
_reset_mem_table();
// create flush handler
- // unique key should flush serial because we need to make sure same key should sort
- // in the same order in all replica.
- bool should_serial = _tablet->keys_type() == KeysType::UNIQUE_KEYS;
+ // by assigning segment_id to memtable before submiting to flush executor,
+ // we can make sure same keys sort in the same order in all replicas.
+ bool should_serial = false;
RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
&_flush_token, _rowset_writer->type(), should_serial, _req.is_high_priority));
@@ -267,6 +267,10 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
Status DeltaWriter::_flush_memtable_async() {
+ if (_mem_table->empty()) {
+ return Status::OK();
+ }
+ _mem_table->assign_segment_id();
return _flush_token->submit(std::move(_mem_table));
}
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c398a22b15..cd4db46d20 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -418,12 +418,12 @@ Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flus
if (!_tablet->enable_unique_key_merge_on_write()) {
return Status::OK();
}
- auto rowset = _rowset_writer->build_tmp();
- auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
- std::vector<segment_v2::SegmentSharedPtr> segments;
if (atomic_num_segments_before_flush >= atomic_num_segments_after_flush) {
return Status::OK();
}
+ auto rowset = _rowset_writer->build_tmp();
+ auto beta_rowset = reinterpret_cast<BetaRowset*>(rowset.get());
+ std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(beta_rowset->load_segments(atomic_num_segments_before_flush,
atomic_num_segments_after_flush, &segments));
std::shared_lock meta_rlock(_tablet->get_header_lock());
@@ -483,11 +483,16 @@ Status MemTable::_do_flush() {
// Unfold variant column
RETURN_IF_ERROR(unfold_variant_column(block, &ctx));
}
+ ctx.segment_id = _segment_id;
SCOPED_RAW_TIMER(&_stat.segment_writer_ns);
RETURN_IF_ERROR(_rowset_writer->flush_single_memtable(&block, &_flush_size, &ctx));
return Status::OK();
}
+void MemTable::assign_segment_id() {
+ _segment_id = std::optional<int32_t> {_rowset_writer->allocate_segment_id()};
+}
+
Status MemTable::close() {
return flush();
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 95d41df62d..d6acc5211c 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -22,6 +22,7 @@
#include <functional>
#include <memory>
+#include <optional>
#include <ostream>
#include <vector>
@@ -145,6 +146,9 @@ public:
_delta_writer_callback = callback;
}
+ bool empty() const { return _input_mutable_block.rows() == 0; }
+ void assign_segment_id();
+
private:
Status _do_flush();
@@ -225,6 +229,7 @@ private:
void _put_into_output(vectorized::Block& in_block);
bool _is_first_insertion;
std::function<void(MemTableStat&)> _delta_writer_callback;
+ std::optional<int32_t> _segment_id = std::nullopt;
void _init_agg_functions(const vectorized::Block* block);
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 5358732daa..e75075d27f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -60,8 +60,8 @@ using namespace ErrorCode;
BetaRowsetWriter::BetaRowsetWriter()
: _rowset_meta(nullptr),
+ _next_segment_id(0),
_num_segment(0),
- _num_flushed_segment(0),
_segment_start_id(0),
_segcompacted_point(0),
_num_segcompacted(0),
@@ -89,9 +89,10 @@ BetaRowsetWriter::~BetaRowsetWriter() {
if (!fs) {
return;
}
- for (int i = 0; i < _num_segment; ++i) {
- std::string seg_path =
- BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, i);
+ auto max_segment_id = std::max(_num_segment.load(), _next_segment_id.load());
+ for (int i = 0; i < max_segment_id; ++i) {
+ std::string seg_path = BetaRowset::segment_file_path(
+ _context.rowset_dir, _context.rowset_id, _segment_start_id + i);
// Even if an error is encountered, these files that have not been cleaned up
// will be cleaned up by the GC background. So here we only print the error
// message when we encounter an error.
@@ -357,8 +358,7 @@ Status BetaRowsetWriter::_segcompaction_if_necessary() {
if (_segcompaction_status.load() != OK) {
status = Status::Error<SEGCOMPACTION_FAILED>();
} else if ((_num_segment - _segcompacted_point) >=
- config::segcompaction_threshold_segment_num &&
- _num_segment == _num_flushed_segment) /* no segment in the middle is flushing */ {
+ config::segcompaction_threshold_segment_num) {
SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>();
status = _get_segcompaction_candidates(segments, false);
if (LIKELY(status.ok()) && (segments->size() > 0)) {
@@ -408,6 +408,17 @@ Status BetaRowsetWriter::_segcompaction_ramaining_if_necessary() {
return status;
}
+Status BetaRowsetWriter::_do_add_block(const vectorized::Block* block,
+ std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
+ size_t row_offset, size_t input_row_num) {
+ auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
+ if (UNLIKELY(!s.ok())) {
+ LOG(WARNING) << "failed to append block: " << s.to_string();
+ return Status::Error<WRITER_DATA_WRITE_ERROR>();
+ }
+ return Status::OK();
+}
+
Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
const FlushContext* flush_ctx) {
@@ -416,6 +427,13 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num);
size_t row_offset = 0;
+ if (flush_ctx != nullptr && flush_ctx->segment_id.has_value()) {
+ // the entire block (memtable) should be flushed into single segment
+ RETURN_IF_ERROR(_do_add_block(block, segment_writer, 0, block_row_num));
+ _raw_num_rows_written += block_row_num;
+ return Status::OK();
+ }
+
do {
auto max_row_add = (*segment_writer)->max_row_to_add(row_avg_size_in_bytes);
if (UNLIKELY(max_row_add < 1)) {
@@ -426,11 +444,7 @@ Status BetaRowsetWriter::_add_block(const vectorized::Block* block,
DCHECK(max_row_add > 0);
}
size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add));
- auto s = (*segment_writer)->append_block(block, row_offset, input_row_num);
- if (UNLIKELY(!s.ok())) {
- LOG(WARNING) << "failed to append block: " << s.to_string();
- return Status::Error<WRITER_DATA_WRITE_ERROR>();
- }
+ RETURN_IF_ERROR(_do_add_block(block, segment_writer, row_offset, input_row_num));
row_offset += input_row_num;
} while (row_offset < block_row_num);
@@ -445,6 +459,9 @@ Status BetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) {
_total_data_size += rowset->rowset_meta()->data_disk_size();
_total_index_size += rowset->rowset_meta()->index_disk_size();
_num_segment += rowset->num_segments();
+ // _next_segment_id is not used in this code path,
+ // just to make sure it matches with _num_segment
+ _next_segment_id = _num_segment.load();
// append key_bounds to current rowset
rowset->get_segments_key_bounds(&_segments_encoded_key_bounds);
// TODO update zonemap
@@ -515,6 +532,8 @@ RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_r
}
RowsetSharedPtr BetaRowsetWriter::build() {
+ // make sure all segments are flushed
+ DCHECK_EQ(_num_segment, _next_segment_id);
// TODO(lingbin): move to more better place, or in a CreateBlockBatch?
for (auto& file_writer : _file_writers) {
Status status = file_writer->close();
@@ -679,7 +698,10 @@ Status BetaRowsetWriter::_do_create_segment_writer(
path = BetaRowset::local_segment_path_segcompacted(_context.rowset_dir, _context.rowset_id,
begin, end);
} else {
- segment_id = _num_segment.fetch_add(1) + _segment_start_id;
+ int32_t segid_offset = (flush_ctx != nullptr && flush_ctx->segment_id.has_value())
+ ? flush_ctx->segment_id.value()
+ : allocate_segment_id();
+ segment_id = segid_offset + _segment_start_id;
path = BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, segment_id);
}
auto fs = _rowset_meta->fs();
@@ -751,6 +773,7 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
int64_t* flush_size) {
uint32_t segid = (*writer)->get_segment_id();
uint32_t row_num = (*writer)->num_rows_written();
+ uint32_t segid_offset = segid - _segment_start_id;
if ((*writer)->num_rows_written() == 0) {
return Status::OK();
@@ -782,8 +805,8 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
_segid_statistics_map.emplace(segid, segstat);
- _segment_num_rows.resize(_num_segment);
- _segment_num_rows[_num_segment - 1] = row_num;
+ _segment_num_rows.resize(_next_segment_id);
+ _segment_num_rows[segid_offset] = row_num;
}
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" << index_size;
@@ -792,7 +815,13 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
if (flush_size) {
*flush_size = segment_size + index_size;
}
- _num_flushed_segment.fetch_add(1);
+ {
+ std::lock_guard<std::mutex> lock(_segment_set_mutex);
+ _segment_set.add(segid_offset);
+ while (_segment_set.contains(_num_segment)) {
+ _num_segment++;
+ }
+ }
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 389c1bbede..88219e5941 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -28,6 +28,8 @@
#include <map>
#include <memory>
#include <mutex>
+#include <optional>
+#include <roaring/roaring.hh>
#include <string>
#include <unordered_set>
#include <vector>
@@ -106,6 +108,8 @@ public:
int32_t get_atomic_num_segment() const override { return _num_segment.load(); }
+ int32_t allocate_segment_id() override { return _next_segment_id.fetch_add(1); };
+
// Maybe modified by local schema change
vectorized::schema_util::LocalSchemaChangeRecorder* mutable_schema_change_recorder() override {
return _context.schema_change_recorder.get();
@@ -122,6 +126,9 @@ public:
Status wait_flying_segcompaction() override;
private:
+ Status _do_add_block(const vectorized::Block* block,
+ std::unique_ptr<segment_v2::SegmentWriter>* segment_writer,
+ size_t row_offset, size_t input_row_num);
Status _add_block(const vectorized::Block* block,
std::unique_ptr<segment_v2::SegmentWriter>* writer,
const FlushContext* flush_ctx = nullptr);
@@ -158,8 +165,11 @@ protected:
RowsetWriterContext _context;
std::shared_ptr<RowsetMeta> _rowset_meta;
- std::atomic<int32_t> _num_segment;
- std::atomic<int32_t> _num_flushed_segment;
+ std::atomic<int32_t> _next_segment_id; // the next available segment_id (offset),
+ // also the numer of allocated segments
+ std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
+ roaring::Roaring _segment_set; // bitmap set to record flushed segment id
+ std::mutex _segment_set_mutex; // mutex for _segment_set
int32_t _segment_start_id; //basic write start from 0, partial update may be different
std::atomic<int32_t> _segcompacted_point; // segemnts before this point have
// already been segment compacted
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 6bfa9f6e9a..0ae5fd46c5 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -20,6 +20,8 @@
#include <gen_cpp/olap_file.pb.h>
#include <gen_cpp/types.pb.h>
+#include <optional>
+
#include "common/factory_creator.h"
#include "gutil/macros.h"
#include "olap/column_mapping.h"
@@ -37,6 +39,7 @@ struct FlushContext {
ENABLE_FACTORY_CREATOR(FlushContext);
TabletSchemaSPtr flush_schema = nullptr;
const vectorized::Block* block = nullptr;
+ std::optional<int32_t> segment_id = std::nullopt;
};
class RowsetWriter {
@@ -99,6 +102,8 @@ public:
virtual int32_t get_atomic_num_segment() const = 0;
+ virtual int32_t allocate_segment_id() = 0;
+
virtual bool is_doing_segcompaction() const = 0;
virtual Status wait_flying_segcompaction() = 0;
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index 9df66651b0..bf003a713c 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -155,6 +155,11 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) {
Status VerticalBetaRowsetWriter::_create_segment_writer(
const std::vector<uint32_t>& column_ids, bool is_key,
std::unique_ptr<segment_v2::SegmentWriter>* writer) {
+ // TODO: just for pass DCHECK now, we should align the meaning
+ // of _num_segment and _next_segment_id with BetaRowsetWriter.
+ // i.e. _next_segment_id means next available segment id,
+ // and _num_segment means num of flushed segments.
+ allocate_segment_id();
auto path =
BetaRowset::segment_file_path(_context.rowset_dir, _context.rowset_id, _num_segment++);
auto fs = _rowset_meta->fs();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org