You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/01/02 06:10:14 UTC

[incubator-doris] branch master updated: [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cc924c9  [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632)
cc924c9 is described below

commit cc924c9e6aac56b245defd6aff6cd6529e53e9c7
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Thu Jan 2 14:10:05 2020 +0800

    [Rowset Reader] Improve the merge read efficiency of alpha rowsets (#2632)
    
    When merge reads from one rowset with multi overlapping segments,
    I introduce a priority queue(A Minimum heap data structure) for multipath merge sort,
    to replace the old N*M time complexity algorithm.
    
    This can significantly improve the read efficiency when merging large number of
    overlapping data.
    
    In mytest:
    1. Compaction with 187 segments reduce time from 75 seconds to 42 seconds
    2. Compaction with 3574 segments cost 43 seconds, and with old version, I kill the
    process after waiting more than 10 minutes...
    
    This CL only change the reads of alpha rowset. Beta rowset will be changed in another CL.
    
    ISSUE: #2631
---
 be/src/olap/rowset/alpha_rowset_reader.cpp | 61 ++++++++++++++++++++++++++-
 be/src/olap/rowset/alpha_rowset_reader.h   | 22 ++++++++++
 be/test/olap/rowset/alpha_rowset_test.cpp  | 66 ++++++++++++++++++++++++++++++
 run-ut.sh                                  |  2 +-
 4 files changed, 149 insertions(+), 2 deletions(-)

diff --git a/be/src/olap/rowset/alpha_rowset_reader.cpp b/be/src/olap/rowset/alpha_rowset_reader.cpp
index 4dd3261..18aec29 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.cpp
+++ b/be/src/olap/rowset/alpha_rowset_reader.cpp
@@ -85,6 +85,7 @@ OLAPStatus AlphaRowsetReader::init(RowsetReaderContext* read_context) {
                                                 *(_current_read_context->seek_columns));
             }
         }
+        RETURN_NOT_OK(_init_merge_heap());
     } else {
         _next_block = &AlphaRowsetReader::_union_block;
     }
@@ -141,13 +142,16 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) {
     size_t num_rows_in_block = 0;
     while (_read_block->pos() < _num_rows_per_row_block) {
         RowCursor* row_cursor = nullptr;
-        status = _pull_next_row_for_merge_rowset(&row_cursor);
+        status = _pull_next_row_for_merge_rowset_v2(&row_cursor);
         if (status == OLAP_ERR_DATA_EOF && _read_block->pos() > 0) {
             status = OLAP_SUCCESS;
             break;
         } else if (status != OLAP_SUCCESS) {
             return status;
         }
+
+        VLOG(10) << "get merged row: " << row_cursor->to_string();
+
         _read_block->get_row(_read_block->pos(), _dst_cursor);
         copy_row(_dst_cursor, *row_cursor, _read_block->mem_pool());
         _read_block->pos_inc();
@@ -160,6 +164,57 @@ OLAPStatus AlphaRowsetReader::_merge_block(RowBlock** block) {
     return status;
 }
 
+OLAPStatus AlphaRowsetReader::_init_merge_heap() {
+    if (_merge_heap.empty() && !_merge_ctxs.empty()) {
+        for (auto& merge_ctx : _merge_ctxs) {
+            RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(&merge_ctx));
+        }
+    }
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus AlphaRowsetReader::_update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx) {
+    if (merge_ctx->is_eof) {
+        // nothing in this merge ctx, just return
+        return OLAP_SUCCESS;
+    }  
+
+    // get next row block of this merge ctx
+    if (merge_ctx->row_block == nullptr || !merge_ctx->row_block->has_remaining()) {
+        OLAPStatus status = _pull_next_block(merge_ctx);
+        if (status == OLAP_ERR_DATA_EOF) {
+            merge_ctx->is_eof = true;
+            return OLAP_SUCCESS;
+        } else if (status != OLAP_SUCCESS) {
+            LOG(WARNING) << "read next row of singleton rowset failed:" << status;
+            return status;
+        }
+    }
+
+    // read the first row, push it into merge heap, and step forward
+    RowCursor* current_row = merge_ctx->row_cursor.get();
+    merge_ctx->row_block->get_row(merge_ctx->row_block->pos(), current_row);
+    _merge_heap.push(merge_ctx);
+    merge_ctx->row_block->pos_inc();
+    return OLAP_SUCCESS;
+}
+
+OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset_v2(RowCursor** row) {
+    // if _merge_heap is not empty, return the row at top, and insert a new row
+    // from conresponding merge_ctx
+    if (!_merge_heap.empty()) {
+        AlphaMergeContext* merge_ctx = _merge_heap.top();
+        *row = merge_ctx->row_cursor.get();
+        _merge_heap.pop();
+        
+        RETURN_NOT_OK(_update_merge_ctx_and_build_merge_heap(merge_ctx));
+        return OLAP_SUCCESS;
+    } else {
+        // all rows are read
+        return OLAP_ERR_DATA_EOF;
+    }
+}
+
 OLAPStatus AlphaRowsetReader::_pull_next_row_for_merge_rowset(RowCursor** row) {
     RowCursor* min_row = nullptr;
     int min_index = -1;
@@ -326,4 +381,8 @@ RowsetSharedPtr AlphaRowsetReader::rowset() {
     return std::static_pointer_cast<Rowset>(_rowset);
 }
 
+bool AlphaMergeContextComparator::operator() (const AlphaMergeContext* x, const AlphaMergeContext* y) const {
+    return compare_row(*(x->row_cursor.get()), *(y->row_cursor.get())) > 0;
+}
+
 }  // namespace doris
diff --git a/be/src/olap/rowset/alpha_rowset_reader.h b/be/src/olap/rowset/alpha_rowset_reader.h
index 7d96192..3f3eb58 100644
--- a/be/src/olap/rowset/alpha_rowset_reader.h
+++ b/be/src/olap/rowset/alpha_rowset_reader.h
@@ -25,6 +25,7 @@
 #include "olap/rowset/alpha_rowset_meta.h"
 
 #include <vector>
+#include <queue>
 
 namespace doris {
 
@@ -41,6 +42,12 @@ struct AlphaMergeContext {
     RowBlock* row_block = nullptr;
 
     std::unique_ptr<RowCursor> row_cursor = nullptr;
+
+    bool is_eof = false;
+};
+
+struct AlphaMergeContextComparator {
+    bool operator () (const AlphaMergeContext* x, const AlphaMergeContext* y) const;
 };
 
 class AlphaRowsetReader : public RowsetReader {
@@ -79,6 +86,18 @@ private:
     // current scan key to next scan key.
     OLAPStatus _pull_first_block(AlphaMergeContext* merge_ctx);
 
+    // merge by priority queue(_merge_heap)
+    // this method has same function with _pull_next_row_for_merge_rowset, but using heap merge.
+    // and this should replace the _pull_next_row_for_merge_rowset later.
+    OLAPStatus _pull_next_row_for_merge_rowset_v2(RowCursor** row);
+    // init the merge heap, this should be call before calling _pull_next_row_for_merge_rowset_v2();
+    OLAPStatus _init_merge_heap();
+    // update the merge ctx.
+    // 1. get next row block of this ctx, if current row block is empty.
+    // 2. read the current row of the row block and push it to merge heap.
+    // 3. point to the next row of the row block
+    OLAPStatus _update_merge_ctx_and_build_merge_heap(AlphaMergeContext* merge_ctx);
+
 private:
     int _num_rows_per_row_block;
     AlphaRowsetSharedPtr _rowset;
@@ -103,6 +122,9 @@ private:
     RowsetReaderContext* _current_read_context;
     OlapReaderStatistics _owned_stats;
     OlapReaderStatistics* _stats = &_owned_stats;
+
+    // a priority queue for merging rowsets
+    std::priority_queue<AlphaMergeContext*, vector<AlphaMergeContext*>, AlphaMergeContextComparator> _merge_heap;
 };
 
 } // namespace doris
diff --git a/be/test/olap/rowset/alpha_rowset_test.cpp b/be/test/olap/rowset/alpha_rowset_test.cpp
index c1ea009..1c26bd3 100644
--- a/be/test/olap/rowset/alpha_rowset_test.cpp
+++ b/be/test/olap/rowset/alpha_rowset_test.cpp
@@ -250,6 +250,72 @@ TEST_F(AlphaRowsetTest, TestAlphaRowsetReader) {
     ASSERT_EQ(1, row_block->remaining());
 }
 
+TEST_F(AlphaRowsetTest, TestRowCursorWithOrdinal) {
+    TabletSchema tablet_schema;
+    create_tablet_schema(AGG_KEYS, &tablet_schema);
+
+    RowCursor* row1 = new (std::nothrow) RowCursor(); // 10, "well", 100
+    row1->init(tablet_schema);
+    int32_t field1_0 = 10;
+    row1->set_not_null(0);
+    row1->set_field_content(0, reinterpret_cast<char*>(&field1_0), _mem_pool.get());
+    Slice field1_1("well");
+    row1->set_not_null(1);
+    row1->set_field_content(1, reinterpret_cast<char*>(&field1_1), _mem_pool.get());
+    int32_t field1_2 = 100;
+    row1->set_not_null(2);
+    row1->set_field_content(2, reinterpret_cast<char*>(&field1_2), _mem_pool.get());
+
+    RowCursor* row2 = new (std::nothrow) RowCursor(); // 11, "well", 100
+    row2->init(tablet_schema);
+    int32_t field2_0 = 11;
+    row2->set_not_null(0);
+    row2->set_field_content(0, reinterpret_cast<char*>(&field2_0), _mem_pool.get());
+    Slice field2_1("well");
+    row2->set_not_null(1);
+    row2->set_field_content(1, reinterpret_cast<char*>(&field2_1), _mem_pool.get());
+    int32_t field2_2 = 100;
+    row2->set_not_null(2);
+    row2->set_field_content(2, reinterpret_cast<char*>(&field2_2), _mem_pool.get());
+
+    RowCursor* row3 = new (std::nothrow) RowCursor(); // 11, "good", 100
+    row3->init(tablet_schema);
+    int32_t field3_0 = 11;
+    row3->set_not_null(0);
+    row3->set_field_content(0, reinterpret_cast<char*>(&field3_0), _mem_pool.get());
+    Slice field3_1("good");
+    row3->set_not_null(1);
+    row3->set_field_content(1, reinterpret_cast<char*>(&field3_1), _mem_pool.get());
+    int32_t field3_2 = 100;
+    row3->set_not_null(2);
+    row3->set_field_content(2, reinterpret_cast<char*>(&field3_2), _mem_pool.get());
+
+    std::priority_queue<AlphaMergeContext*, std::vector<AlphaMergeContext*>, AlphaMergeContextComparator> queue;
+    AlphaMergeContext ctx1;
+    ctx1.row_cursor.reset(row1);
+    AlphaMergeContext ctx2;
+    ctx2.row_cursor.reset(row2);
+    AlphaMergeContext ctx3;
+    ctx3.row_cursor.reset(row3);
+
+    queue.push(&ctx1);
+    queue.push(&ctx2);
+    queue.push(&ctx3);
+
+    // should be:
+    // row1, row3, row2
+    AlphaMergeContext* top1 = queue.top();
+    ASSERT_EQ(top1, &ctx1);
+    queue.pop();
+    AlphaMergeContext* top2 = queue.top();
+    ASSERT_EQ(top2, &ctx3);
+    queue.pop();
+    AlphaMergeContext* top3 = queue.top();
+    ASSERT_EQ(top3, &ctx2);
+    queue.pop();
+    ASSERT_TRUE(queue.empty());
+}
+
 }  // namespace doris
 
 int main(int argc, char **argv) {
diff --git a/run-ut.sh b/run-ut.sh
index 03432a6..f079703 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -253,7 +253,7 @@ ${DORIS_TEST_BINARY_DIR}/olap/serialize_test
 # ${DORIS_TEST_BINARY_DIR}/olap/memtable_flush_executor_test
 ${DORIS_TEST_BINARY_DIR}/olap/options_test
 
-# Running routine load test
+# Running segment v2 test
 ${DORIS_TEST_BINARY_DIR}/olap/tablet_meta_manager_test
 ${DORIS_TEST_BINARY_DIR}/olap/tablet_mgr_test
 ${DORIS_TEST_BINARY_DIR}/olap/olap_meta_test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org