You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/01/02 06:10:14 UTC
[incubator-doris] branch master updated: [Rowset Reader] Improve
the merge read efficiency of alpha rowsets (#2632)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new cc924c9 [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632)
cc924c9 is described below
commit cc924c9e6aac56b245defd6aff6cd6529e53e9c7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Jan 2 14:10:05 2020 +0800
[Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632)
When merge reads from one rowset with multi overlapping segments,
I introduce a priority queue(A Minimum heap data structure) for multipath merge sort,
to replace the old N*M time complexity algorithm.
This can significantly improve the read efficiency when merging large number of
overlapping data.
In mytest:
1. Compaction with 187 segments reduce time from 75 seconds to 42 seconds
2. Compaction with 3574 segments cost 43 seconds, and with old version, I kill the
process after waiting more than 10 minutes...
This CL only change the reads of alpha rowset. Beta rowset will be changed in another CL.
ISSUE: #2631
---
be/src/olap/rowset/alpha_rowset_reader.cpp | 61 ++++++++++++++++++++++++++-
be/src/olap/rowset/alpha_rowset_reader.h | 22 ++++++++++
be/test/olap/rowset/alpha_rowset_test.cpp | 66 ++++++++++++++++++++++++++++++
run-ut.sh | 2 +-
4 files changed, 149 insertions(+), 2 deletions(-)
diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp
index 4dd3261..18aec29 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.cpp
+++ b/be/src/olap/rowset/alpha_rowset_reader.cpp
@@ -85,6 +85,7 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) {
*(_current_read_context->seek_columns));
}
}
+ RETURN_NOT_OK(_init_merge_heap());
} else {
_next_block = &AlphaRowsetReader::_union_block;
}
@@ -141,13 +142,16 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) {
size_t num_rows_in_block = 0;
while (_read_block->pos() < _num_rows_per_row_block) {
RowCursor* row_cursor = nullptr;
- status = _pull_next_row_for_merge_rowset(&row_cursor);
+ status = _pull_next_row_for_merge_rowset_v2(&row_cursor);
if (status == OLAP_ERR_DATA_EOF && _read_block->pos() > 0) {
status = OLAP_SUCCESS;
break;
} else if (status != OLAP_SUCCESS) {
return status;
}
+
+ VLOG(10) << "get merged row: " << row_cursor->to_string();
+
_read_block->get_row(_read_block->pos(), _dst_cursor);
copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool());
_read_block->pos_inc();
@@ -160,6 +164,57 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) {
return status;
}
+OLAPStatus AlphaRowsetReader::_init_merge_heap() {
+ if (_merge_heap.empty() && !_merge_ctxs.empty()) {
+ for (auto& merge_ctx : _merge_ctxs) {
+ RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(&merge_ctx));
+ }
+ }
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx) {
+ if (merge_ctx->is_eof) {
+ // nothing in this merge ctx, just return
+ return OLAP_SUCCESS;
+ }
+
+ // get next row block of this merge ctx
+ if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) {
+ OLAPStatus status = _pull_next_block(merge_ctx);
+ if (status == OLAP_ERR_DATA_EOF) {
+ merge_ctx->is_eof = true;
+ return OLAP_SUCCESS;
+ } else if (status != OLAP_SUCCESS) {
+ LOG(WARNING) << "read next row of singleton rowset failed:" << status;
+ return status;
+ }
+ }
+
+ // read the first row, push it into merge heap, and step forward
+ RowCursor* current_row = merge_ctx->row_cursor.get();
+ merge_ctx->row_block->get_row(merge_ctx->row_block->pos(), current_row);
+ _merge_heap.push(merge_ctx);
+ merge_ctx->row_block->pos_inc();
+ return OLAP_SUCCESS;
+}
+
+OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row) {
+ // if _merge_heap is not empty, return the row at top, and insert a new row
+ // from conresponding merge_ctx
+ if (!_merge_heap.empty()) {
+ AlphaMergeContext* merge_ctx = _merge_heap.top();
+ *row = merge_ctx->row_cursor.get();
+ _merge_heap.pop();
+
+ RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx));
+ return OLAP_SUCCESS;
+ } else {
+ // all rows are read
+ return OLAP_ERR_DATA_EOF;
+ }
+}
+
OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset(RowCursor** row) {
RowCursor* min_row = nullptr;
int min_index = -1;
@@ -326,4 +381,8 @@ RowsetSharedPtr AlphaRowsetReader::rowset() {
return std::static_pointer_cast<Rowset>(_rowset);
}
+bool AlphaMergeContextComparator::operator() (const AlphaMergeContext* x, const AlphaMergeContext* y) const {
+ return compare_row(*(x->row_cursor.get()), *(y->row_cursor.get())) > 0;
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h
index 7d96192..3f3eb58 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.h
+++ b/be/src/olap/rowset/alpha_rowset_reader.h
@@ -25,6 +25,7 @@
#include "olap/rowset/alpha_rowset_meta.h"
#include <vector>
+#include <queue>
namespace doris {
@@ -41,6 +42,12 @@ struct AlphaMergeContext {
RowBlock* row_block = nullptr;
std::unique_ptr<RowCursor> row_cursor = nullptr;
+
+ bool is_eof = false;
+};
+
+struct AlphaMergeContextComparator {
+ bool operator () (const AlphaMergeContext* x, const AlphaMergeContext* y) const;
};
class AlphaRowsetReader : public RowsetReader {
@@ -79,6 +86,18 @@ private:
// current scan key to next scan key.
OLAPStatus _pull_first_block(AlphaMergeContext* merge_ctx);
+ // merge by priority queue(_merge_heap)
+ // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge.
+ // and this should replace the _pull_next_row_for_merge_rowset later.
+ OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row);
+ // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2();
+ OLAPStatus _init_merge_heap();
+ // update the merge ctx.
+ // 1. get next row block of this ctx, if current row block is empty.
+ // 2. read the current row of the row block and push it to merge heap.
+ // 3. point to the next row of the row block
+ OLAPStatus _update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx);
+
private:
int _num_rows_per_row_block;
AlphaRowsetSharedPtr _rowset;
@@ -103,6 +122,9 @@ private:
RowsetReaderContext* _current_read_context;
OlapReaderStatistics _owned_stats;
OlapReaderStatistics* _stats = &_owned_stats;
+
+ // a priority queue for merging rowsets
+ std::priority_queue<AlphaMergeContext*, vector<AlphaMergeContext*>, AlphaMergeContextComparator> _merge_heap;
};
} // namespace doris
diff --git a/be/test/olap/rowset/alpha_rowset_test.cpp b/be/test/olap/rowset/alpha_rowset_test.cpp
index c1ea009..1c26bd3 100644
--- a/be/test/olap/rowset/alpha_rowset_test.cpp
+++ b/be/test/olap/rowset/alpha_rowset_test.cpp
@@ -250,6 +250,72 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) {
ASSERT_EQ(1, row_block->remaining());
}
+TEST_F(AlphaRowsetTest, TestRowCursorWithOrdinal) {
+ TabletSchema tablet_schema;
+ create_tablet_schema(AGG_KEYS, &tablet_schema);
+
+ RowCursor* row1 = new (std::nothrow) RowCursor(); // 10, "well", 100
+ row1->init(tablet_schema);
+ int32_t field1_0 = 10;
+ row1->set_not_null(0);
+ row1->set_field_content(0, reinterpret_cast<char*>(&field1_0), _mem_pool.get());
+ Slice field1_1("well");
+ row1->set_not_null(1);
+ row1->set_field_content(1, reinterpret_cast<char*>(&field1_1), _mem_pool.get());
+ int32_t field1_2 = 100;
+ row1->set_not_null(2);
+ row1->set_field_content(2, reinterpret_cast<char*>(&field1_2), _mem_pool.get());
+
+ RowCursor* row2 = new (std::nothrow) RowCursor(); // 11, "well", 100
+ row2->init(tablet_schema);
+ int32_t field2_0 = 11;
+ row2->set_not_null(0);
+ row2->set_field_content(0, reinterpret_cast<char*>(&field2_0), _mem_pool.get());
+ Slice field2_1("well");
+ row2->set_not_null(1);
+ row2->set_field_content(1, reinterpret_cast<char*>(&field2_1), _mem_pool.get());
+ int32_t field2_2 = 100;
+ row2->set_not_null(2);
+ row2->set_field_content(2, reinterpret_cast<char*>(&field2_2), _mem_pool.get());
+
+ RowCursor* row3 = new (std::nothrow) RowCursor(); // 11, "good", 100
+ row3->init(tablet_schema);
+ int32_t field3_0 = 11;
+ row3->set_not_null(0);
+ row3->set_field_content(0, reinterpret_cast<char*>(&field3_0), _mem_pool.get());
+ Slice field3_1("good");
+ row3->set_not_null(1);
+ row3->set_field_content(1, reinterpret_cast<char*>(&field3_1), _mem_pool.get());
+ int32_t field3_2 = 100;
+ row3->set_not_null(2);
+ row3->set_field_content(2, reinterpret_cast<char*>(&field3_2), _mem_pool.get());
+
+ std::priority_queue<AlphaMergeContext*, std::vector<AlphaMergeContext*>, AlphaMergeContextComparator> queue;
+ AlphaMergeContext ctx1;
+ ctx1.row_cursor.reset(row1);
+ AlphaMergeContext ctx2;
+ ctx2.row_cursor.reset(row2);
+ AlphaMergeContext ctx3;
+ ctx3.row_cursor.reset(row3);
+
+ queue.push(&ctx1);
+ queue.push(&ctx2);
+ queue.push(&ctx3);
+
+ // should be:
+ // row1, row3, row2
+ AlphaMergeContext* top1 = queue.top();
+ ASSERT_EQ(top1, &ctx1);
+ queue.pop();
+ AlphaMergeContext* top2 = queue.top();
+ ASSERT_EQ(top2, &ctx3);
+ queue.pop();
+ AlphaMergeContext* top3 = queue.top();
+ ASSERT_EQ(top3, &ctx2);
+ queue.pop();
+ ASSERT_TRUE(queue.empty());
+}
+
} // namespace doris
int main(int argc, char **argv) {
diff --git a/run-ut.sh b/run-ut.sh
index 03432a6..f079703 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -253,7 +253,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/serialize_test
# ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test
${DORIS_TEST_BINARY_DIR}/olap/options_test
-# Running routine load test
+# Running segment v2 test
${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test
${DORIS_TEST_BINARY_DIR}/olap/tablet_mgr_test
${DORIS_TEST_BINARY_DIR}/olap/olap_meta_test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org