You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/07/02 05:56:17 UTC

[incubator-doris] branch master updated: [Performance] Improve performance of unique table read (#3974)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d3d8358  [Performance] Improve performance of  unique table read (#3974)
d3d8358 is described below

commit d3d835844f3e140065f6e621a7a619faa965f074
Author: yangzhg <78...@qq.com>
AuthorDate: Thu Jul 2 13:56:08 2020 +0800

    [Performance] Improve performance of  unique table read (#3974)
    
    Implements #3971
    the test table as list:
    ```
    mysql> desc test;
    +------------+---------+------+-------+---------+---------+
    | Field      | Type    | Null | Key   | Default | Extra   |
    +------------+---------+------+-------+---------+---------+
    | rid        | BIGINT  | No   | true  | 0       |         |
    | qid        | BIGINT  | No   | true  | 0       |         |
    | qidDeleted | TINYINT | No   | false | 0       | REPLACE |
    | type       | TINYINT | No   | false | 0       | REPLACE |
    | uid        | BIGINT  | No   | false | 0       | REPLACE |
    | toUid      | BIGINT  | No   | false | 0       | REPLACE |
    | status     | INT     | No   | false | 0       | REPLACE |
    | createTime | INT     | No   | false | 0       | REPLACE |
    | source     | INT     | No   | false | 0       | REPLACE |
    | misFlag    | INT     | No   | false | 0       | REPLACE |
    | anonymous  | TINYINT | No   | false | 0       | REPLACE |
    | uv         | TINYINT | No   | false | 1       | REPLACE |
    +------------+---------+------+-------+---------+---------+
    12 rows in set (0.00 sec)
    
    mysql> select count(*) from test;
    +----------+
    | count(*) |
    +----------+
    |  1093760 |
    +----------+
    1 row in set (1.00 sec)
    ```
    There is 29 versions at present
    ![image](https://user-images.githubusercontent.com/9098473/85992244-2aa26c80-ba27-11ea-918a-04701a58dbdf.png)
    I run the query `select sum(uv) from test` for 10 times,
    the average ScanTime reduced from `9s277ms`  to `8s206ms`
---
 be/src/olap/reader.cpp | 60 +++++++++++++++++++++++++++-----------------------
 1 file changed, 32 insertions(+), 28 deletions(-)

diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 66de5df..06a0965 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -152,7 +152,12 @@ private:
     // if row cursors equal, compare data version.
     class ChildCtxComparator {
     public:
+        ChildCtxComparator(const bool& revparam=false) {
+            _reverse = revparam;
+        }
         bool operator()(const ChildCtx* a, const ChildCtx* b);
+    private:
+        bool _reverse;
     };
 
     inline OLAPStatus _merge_next(const RowCursor** row, bool* delete_flag);
@@ -172,7 +177,7 @@ private:
     bool _merge = true;
     // used when `_merge == true`
     typedef std::priority_queue<ChildCtx*, std::vector<ChildCtx*>, ChildCtxComparator> MergeHeap;
-    MergeHeap _heap;
+    std::unique_ptr<MergeHeap> _heap;
     // used when `_merge == false`
     int _child_idx = 0;
 
@@ -195,6 +200,11 @@ void CollectIterator::init(Reader* reader) {
             (_reader->_aggregation ||
              _reader->_tablet->keys_type() == KeysType::DUP_KEYS)) {
         _merge = false;
+        _heap.reset(nullptr);
+    } else if (_reader->_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+        _heap.reset(new MergeHeap(ChildCtxComparator(true)));
+    } else {
+        _heap.reset(new MergeHeap());
     }
 }
 
@@ -208,8 +218,8 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
     ChildCtx* child_ptr = child.release();
     _children.push_back(child_ptr);
     if (_merge) {
-        _heap.push(child_ptr);
-        _cur_child = _heap.top();
+        _heap->push(child_ptr);
+        _cur_child = _heap->top();
     } else {
         if (_cur_child == nullptr) {
             _cur_child = _children[_child_idx];
@@ -219,14 +229,14 @@ OLAPStatus CollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
 }
 
 inline OLAPStatus CollectIterator::_merge_next(const RowCursor** row, bool* delete_flag) {
-    _heap.pop();
+    _heap->pop();
     auto res = _cur_child->next(row, delete_flag);
     if (res == OLAP_SUCCESS) {
-        _heap.push(_cur_child);
-        _cur_child = _heap.top();
+        _heap->push(_cur_child);
+        _cur_child = _heap->top();
     } else if (res == OLAP_ERR_DATA_EOF) {
-        if (!_heap.empty()) {
-            _cur_child = _heap.top();
+        if (!_heap->empty()) {
+            _cur_child = _heap->top();
         } else {
             _cur_child = nullptr;
             return OLAP_ERR_DATA_EOF;
@@ -269,12 +279,18 @@ bool CollectIterator::ChildCtxComparator::operator()(const ChildCtx* a, const Ch
         return cmp_res > 0;
     }
     // if row cursors equal, compare data version.
+    // read data from higher version to lower version.
+    // for UNIQUE_KEYS just read the highest version and no need agg_update.
+    // for AGG_KEYS if a version is deleted, the lower version no need to agg_update
+    if (_reverse) {
+        return a->version() < b->version();
+    }
     return a->version() > b->version();
 }
 
 void CollectIterator::clear() {
-    while (!_heap.empty()) {
-        _heap.pop();
+    while (_heap != nullptr && !_heap->empty()) {
+        _heap->pop();
     }
     for (auto child : _children) {
         delete child;
@@ -396,9 +412,12 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
         }
 
         cur_delete_flag = _next_delete_flag;
-        init_row_with_others(row_cursor, *_next_key, mem_pool, agg_pool);
-
-        int64_t merged_count = 0;
+        // the verion is in reverse order, the first row is the highest version,
+        // in UNIQUE_KEY highest version is the final result, there is no need to
+        // merge the lower versions
+        direct_copy_row(row_cursor, *_next_key);
+        agg_finalize_row(_value_cids, row_cursor, mem_pool);
+        // skip the lower version rows;
         while (nullptr != _next_key) {
             auto res = _collect_iter->next(&_next_key, &_next_delete_flag);
             if (res != OLAP_SUCCESS) {
@@ -408,26 +427,11 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
                 break;
             }
 
-            // we will not do aggregation in two case:
-            //   1. DUP_KEYS keys type has no semantic to aggregate,
-            //   2. to make cost of each scan round reasonable, we will control merged_count.
-            if (_aggregation && merged_count > config::doris_scanner_row_num) {
-                agg_finalize_row(_value_cids, row_cursor, mem_pool);
-                break;
-            }
             // break while can NOT doing aggregation
             if (!equal_row(_key_cids, *row_cursor, *_next_key)) {
-                agg_finalize_row(_value_cids, row_cursor, mem_pool);
                 break;
             }
-
-            cur_delete_flag = _next_delete_flag;
-            agg_update_row(_value_cids, row_cursor, *_next_key);
-            ++merged_count;
         }
-
-        _merged_rows += merged_count;
-
         if (!cur_delete_flag) {
             return OLAP_SUCCESS;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org