You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2019/12/26 14:00:22 UTC

[incubator-doris] branch master updated: [Segment V2] Support lazy-materialization-read (#2547)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 11f8d54  [Segment V2] Support lazy-materialization-read (#2547)
11f8d54 is described below

commit 11f8d542db19f638a71b5065e00ca9057d7f8ea9
Author: Dayue Gao <ga...@meituan.com>
AuthorDate: Thu Dec 26 22:00:16 2019 +0800

    [Segment V2] Support lazy-materialization-read (#2547)
    
    Current read path of SegmentIterator
    ----
    
    1. apply short key index and various column indexes to get the row ranges (ordinals of rows) to scan
    2. read all return columns according to the row ranges
    3. evaluate column predicates on the RowBlockV2 to further prune rows
    
    Problem
    ----
    
    When the column predicates at step 3 could filter a large proportion of rows in RowBlockV2, most values of non-predicate columns we read at step 2 are thrown away, i.e we did lots of useless work and I/O at step 2.
    
    Lazy materialization read
    ----
    With lazy materialization, the read path changes to
    1. apply short key index and various column indexes to get the row ranges (ordinals of rows) to scan (unchanged)
    2. **read only predicate columns** according to the row ranges
    3. evaluate column predicates on the RowBlockV2 to further prune rows, a selection vector is maintained to indicate the selected rows
    4. **read the remaining columns** based on the *selection vector* of RowBlockV2
    
    In this way, we could avoid reading values of non-predicate columns of all rows that can't pass the predicates.
    
    Example
    ----
    ```
    function: seek(ordinal), read(block_offset, count)
    
    (step 1) row ranges: [0,2),[4,8),[10,11),[15,20)
    (step 1) row ordinals: [0 1 4 5 6 7 10 15 16 17 18 19]
    (step 2) read of predicate columns: seek(0),read(0,2),seek(4),read(2,4),seek(10),read(6,1),seek(15),read(7,5)
    (step 3) selection vector: [3 4 5 6]
    (step 3) selected ordinals: [5 6 7 10]
    (step 4) read of remaining columns: seek(5),read(3,3),seek(10),read(6,1)
    ```
    
    Performance evaluation
    ----
    Lazy materialization is particularly useful when column predicates could filter many rows and lots of big metrics (e.g., hll and bitmap type columns) are queried. In our internal test cases on bitmap columns, queries run 20%~120% faster when using lazy materialization.
---
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 212 ++++++++++++++++-----
 be/src/olap/rowset/segment_v2/segment_iterator.h   |  82 +++-----
 2 files changed, 184 insertions(+), 110 deletions(-)

diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 805f660..b7b7c6f 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -34,6 +34,63 @@ using strings::Substitute;
 namespace doris {
 namespace segment_v2 {
 
+// A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
+// Example:
+//   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
+//   output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10)
+//   output ranges: [0,2), [4,8), [10,11), [15,18), [18,20) (when max_range_size=3)
+class SegmentIterator::BitmapRangeIterator {
+public:
+    explicit BitmapRangeIterator(const Roaring& bitmap) {
+        roaring_init_iterator(&bitmap.roaring, &_iter);
+        _last_val = 0;
+        _buf = new uint32_t[256];
+        _read_next_batch();
+    }
+
+    ~BitmapRangeIterator() {
+        delete[] _buf;
+    }
+
+    bool has_more_range() const { return !_eof; }
+
+    // read next range into [*from, *to) whose size <= max_range_size.
+    // return false when there is no more range.
+    bool next_range(uint32_t max_range_size, uint32_t* from, uint32_t* to) {
+        if (_eof) {
+            return false;
+        }
+        *from = _buf[_buf_pos];
+        uint32_t range_size = 0;
+        do {
+            _last_val = _buf[_buf_pos];
+            _buf_pos++;
+            range_size++;
+            if (_buf_pos == _buf_size) { // read next batch
+                _read_next_batch();
+            }
+        } while (range_size < max_range_size && !_eof && _buf[_buf_pos] == _last_val + 1);
+        *to = *from + range_size;
+        return true;
+    }
+
+private:
+    void _read_next_batch() {
+        uint32_t n = roaring_read_uint32_iterator(&_iter, _buf, kBatchSize);
+        _buf_pos = 0;
+        _buf_size = n;
+        _eof = n == 0;
+    }
+
+    static const uint32_t kBatchSize = 256;
+    roaring_uint32_iterator_t _iter;
+    uint32_t _last_val;
+    uint32_t* _buf = nullptr;
+    uint32_t _buf_pos;
+    uint32_t _buf_size;
+    bool _eof;
+};
+
 SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment,
                                  const Schema& schema)
     : _segment(std::move(segment)),
@@ -41,6 +98,7 @@ SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment,
       _column_iterators(_schema.num_columns(), nullptr),
       _bitmap_index_iterators(_schema.num_columns(), nullptr),
       _cur_rowid(0),
+      _lazy_materialization_read(false),
       _inited(false) {
 }
 
@@ -53,13 +111,22 @@ SegmentIterator::~SegmentIterator() {
     }
 }
 
+Status SegmentIterator::init(const StorageReadOptions& opts) {
+    _opts = opts;
+    if (opts.column_predicates != nullptr) {
+        _col_predicates = *(opts.column_predicates);
+    }
+    return Status::OK();
+}
+
 Status SegmentIterator::_init() {
     DorisMetrics::segment_read_total.increment(1);
     _row_bitmap.addRange(0, _segment->num_rows());
-    RETURN_IF_ERROR(_init_column_iterators());
+    RETURN_IF_ERROR(_init_return_column_iterators());
     RETURN_IF_ERROR(_init_bitmap_index_iterators());
     RETURN_IF_ERROR(_get_row_ranges_by_keys());
     RETURN_IF_ERROR(_get_row_ranges_by_column_conditions());
+    _init_lazy_materialization();
     _range_iter.reset(new BitmapRangeIterator(_row_bitmap));
     return Status::OK();
 }
@@ -134,19 +201,11 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
 
 Status SegmentIterator::_get_row_ranges_by_column_conditions() {
     if (_row_bitmap.isEmpty()) {
-        // no data just return;
         return Status::OK();
     }
+    RETURN_IF_ERROR(_apply_bitmap_index());
 
-    if (_opts.column_predicates != nullptr) {
-        Roaring bitmap;
-        RETURN_IF_ERROR(_get_row_ranges_from_bitmap_index(&bitmap));
-        size_t pre_size = _row_bitmap.cardinality();
-        _row_bitmap &= bitmap;
-        _opts.stats->bitmap_index_filter_count += (pre_size - _row_bitmap.cardinality());
-    }
-
-    if (_opts.conditions != nullptr || _opts.delete_conditions.size() > 0) {
+    if (!_row_bitmap.isEmpty() && (_opts.conditions != nullptr || _opts.delete_conditions.size() > 0)) {
         RowRanges condition_row_ranges;
         RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
         size_t pre_size = _row_bitmap.cardinality();
@@ -195,18 +254,30 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
     return Status::OK();
 }
 
-Status SegmentIterator::_get_row_ranges_from_bitmap_index(Roaring* row_bitmap) {
+// filter rows by evaluating column predicates using bitmap indexes.
+// upon return, predicates that've been evaluated by bitmap indexes are removed from _col_predicates.
+Status SegmentIterator::_apply_bitmap_index() {
     SCOPED_RAW_TIMER(&_opts.stats->bitmap_index_filter_timer);
-    Roaring bitmap;
-    bitmap.addRange(0, _segment->num_rows());
-    for (auto pred: *_opts.column_predicates) {
-        RETURN_IF_ERROR(pred->evaluate(_schema, _bitmap_index_iterators, _segment->num_rows(), &bitmap));
+    size_t input_rows = _row_bitmap.cardinality();
+    std::vector<ColumnPredicate*> remaining_predicates;
+
+    for (auto pred : _col_predicates) {
+        if (_bitmap_index_iterators[pred->column_id()] == nullptr) {
+            // no bitmap index for this column
+            remaining_predicates.push_back(pred);
+        } else {
+            RETURN_IF_ERROR(pred->evaluate(_schema, _bitmap_index_iterators, _segment->num_rows(), &_row_bitmap));
+            if (_row_bitmap.isEmpty()) {
+                break; // all rows have been pruned, no need to process further predicates
+            }
+        }
     }
-    *row_bitmap = std::move(bitmap);
+    _col_predicates = std::move(remaining_predicates);
+    _opts.stats->bitmap_index_filter_count += (input_rows - _row_bitmap.cardinality());
     return Status::OK();
 }
 
-Status SegmentIterator::_init_column_iterators() {
+Status SegmentIterator::_init_return_column_iterators() {
     if (_cur_rowid >= num_rows()) {
         return Status::OK();
     }
@@ -314,34 +385,28 @@ Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
     // please note that usually RowBlockV2.clear() is called to free MemPool memory before reading the next block,
     // but here since there won't be too many keys to seek, we don't call RowBlockV2.clear() so that we can use
     // a single MemPool for all seeked keys.
-    RETURN_IF_ERROR(_next_batch(_seek_block.get(), 0, &num_rows));
+    RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block.get(), 0, num_rows));
     _seek_block->set_num_rows(num_rows);
     return Status::OK();
 }
 
-// Trying to read `rows_read` rows into `block` at the given offset.
-// Return the actual number of rows read in `*rows_read`.
-Status SegmentIterator::_next_batch(RowBlockV2* block, size_t row_offset, size_t* rows_read) {
-    bool has_read = false;
-    size_t first_read = 0;
-    for (auto cid : block->schema()->column_ids()) {
-        size_t num_rows = has_read ? first_read : *rows_read;
-        auto column_block = block->column_block(cid);
-        ColumnBlockView dst(&column_block, row_offset);
-        RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&num_rows, &dst));
-        block->set_delete_state(column_block.delete_state());
-        if (!has_read) {
-            has_read = true;
-            first_read = num_rows;
-        } else if (num_rows != first_read) {
-            return Status::InternalError(
-                Substitute("Read different rows in different columns"
-                           ", column($0) read $1 vs column($2) read $3",
-                           block->schema()->column_ids()[0], first_read, cid, num_rows));
+void SegmentIterator::_init_lazy_materialization() {
+    if (!_col_predicates.empty()) {
+        std::set<ColumnId> predicate_columns;
+        for (auto predicate : _col_predicates) {
+            predicate_columns.insert(predicate->column_id());
+        }
+        // when all return columns have predicates, disable lazy materialization to avoid its overhead
+        if (_schema.column_ids().size() > predicate_columns.size()) {
+            _lazy_materialization_read = true;
+            _predicate_columns.assign(predicate_columns.cbegin(), predicate_columns.cend());
+            for (auto cid : _schema.column_ids()) {
+                if (predicate_columns.find(cid) == predicate_columns.end()) {
+                    _non_predicate_columns.push_back(cid);
+                }
+            }
         }
     }
-    *rows_read = first_read;
-    return Status::OK();
 }
 
 Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) {
@@ -353,16 +418,38 @@ Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, r
     return Status::OK();
 }
 
+Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
+                                      RowBlockV2* block,
+                                      size_t row_offset,
+                                      size_t nrows) {
+    for (auto cid : column_ids) {
+        auto column_block = block->column_block(cid);
+        ColumnBlockView dst(&column_block, row_offset);
+        size_t rows_read = nrows;
+        RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, &dst));
+        block->set_delete_state(column_block.delete_state());
+        DCHECK_EQ(nrows, rows_read);
+    }
+    return Status::OK();
+}
+
 Status SegmentIterator::next_batch(RowBlockV2* block) {
     SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
     if (UNLIKELY(!_inited)) {
         RETURN_IF_ERROR(_init());
+        if (_lazy_materialization_read) {
+            _block_rowids.reserve(block->capacity());
+        }
         _inited = true;
     }
 
     uint32_t total_read = 0;
     uint32_t remaining = block->capacity();
-    // trying to fill in block
+    _block_rowids.resize(0);
+    const auto& read_columns = _lazy_materialization_read ? _predicate_columns : block->schema()->column_ids();
+
+    // phase 1: read rows selected by various index (indicated by _row_bitmap) into block
+    // when using lazy-materialization-read, only columns with predicates are read
     do {
         uint32_t range_from;
         uint32_t range_to;
@@ -370,14 +457,17 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
         if (!has_next_range) {
             break;
         }
+        if (_lazy_materialization_read) {
+            for (uint32_t rid = range_from; rid < range_to; rid++) {
+                _block_rowids.push_back(rid);
+            }
+        }
         if (_cur_rowid == 0 || _cur_rowid != range_from) {
             _cur_rowid = range_from;
-            RETURN_IF_ERROR(_seek_columns(block->schema()->column_ids(), _cur_rowid));
+            RETURN_IF_ERROR(_seek_columns(read_columns, _cur_rowid));
         }
         size_t rows_to_read = range_to - range_from;
-        size_t rows_read = rows_to_read;
-        RETURN_IF_ERROR(_next_batch(block, total_read, &rows_read));
-        DCHECK_EQ(rows_to_read, rows_read);
+        RETURN_IF_ERROR(_read_columns(read_columns, block, total_read, rows_to_read));
         _cur_rowid += rows_to_read;
         total_read += rows_to_read;
         remaining -= rows_to_read;
@@ -388,28 +478,48 @@ Status SegmentIterator::next_batch(RowBlockV2* block) {
     if (total_read == 0) {
         return Status::EndOfFile("no more data in segment");
     }
-
     // update raw_rows_read counter
     // judge nullptr for unit test case
     _opts.stats->raw_rows_read += total_read;
+    _opts.stats->blocks_load += 1;
 
-    // column predicate vectorization execution
-    // TODO(hkp): lazy materialization
+    // phase 2: run vectorization evaluation on remaining predicates to prune rows.
+    // block's selection vector will be set to indicate which rows have passed predicates.
     // TODO(hkp): optimize column predicate to check column block once for one column
-    if (_opts.column_predicates != nullptr) {
+    if (!_col_predicates.empty()) {
         // init selection position index
         uint16_t selected_size = block->selected_size();
         uint16_t original_size = selected_size;
         SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
-        for (auto column_predicate : *_opts.column_predicates) {
+        for (auto column_predicate : _col_predicates) {
             auto column_block = block->column_block(column_predicate->column_id());
             column_predicate->evaluate(&column_block, block->selection_vector(), &selected_size);
         }
         block->set_selected_size(selected_size);
         _opts.stats->rows_vec_cond_filtered += original_size - selected_size;
     }
-    ++_opts.stats->blocks_load;
+
+    // phase 3: read non-predicate columns of rows that have passed predicates
+    if (_lazy_materialization_read) {
+        uint16_t i = 0;
+        const uint16_t* sv = block->selection_vector();
+        const uint16_t sv_size = block->selected_size();
+        while (i < sv_size) {
+            // i: start offset the current range
+            // j: past the last offset of the current range
+            uint16_t j = i + 1;
+            while (j < sv_size && _block_rowids[sv[j]] == _block_rowids[sv[j - 1]] + 1) {
+                ++j;
+            }
+            uint16_t range_size = j - i;
+            RETURN_IF_ERROR(_seek_columns(_non_predicate_columns, _block_rowids[sv[i]]));
+            RETURN_IF_ERROR(_read_columns(_non_predicate_columns, block, sv[i], range_size));
+            i += range_size;
+        }
+    }
+
     return Status::OK();
+
 }
 
 }
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.h b/be/src/olap/rowset/segment_v2/segment_iterator.h
index fbc3d1c..0b71252 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.h
@@ -47,15 +47,15 @@ class SegmentIterator : public RowwiseIterator {
 public:
     SegmentIterator(std::shared_ptr<Segment> segment, const Schema& _schema);
     ~SegmentIterator() override;
-    Status init(const StorageReadOptions& opts) override {
-        _opts = opts;
-        return Status::OK();
-    }
+    Status init(const StorageReadOptions& opts) override;
     Status next_batch(RowBlockV2* row_block) override;
     const Schema& schema() const override { return _schema; }
 private:
     Status _init();
 
+    Status _init_return_column_iterators();
+    Status _init_bitmap_index_iterators();
+
     // calculate row ranges that fall into requested key ranges using short key index
     Status _get_row_ranges_by_keys();
     Status _prepare_seek(const StorageReadOptions::KeyRange& key_range);
@@ -65,14 +65,10 @@ private:
 
     // calculate row ranges that satisfy requested column conditions using various column index
     Status _get_row_ranges_by_column_conditions();
-
     Status _get_row_ranges_from_conditions(RowRanges* condition_row_ranges);
+    Status _apply_bitmap_index();
 
-    Status _get_row_ranges_from_bitmap_index(Roaring* bitmap_row_ranges);
-
-    Status _init_column_iterators();
-
-    Status _init_bitmap_index_iterators();
+    void _init_lazy_materialization();
 
     Status _next_batch(RowBlockV2* block, size_t row_offset, size_t* rows_read);
 
@@ -80,57 +76,11 @@ private:
     uint32_t num_rows() const { return _segment->num_rows(); }
 
     Status _seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos);
+    // read `nrows` of columns specified by `column_ids` into `block` at `row_offset`.
+    Status _read_columns(const std::vector<ColumnId>& column_ids, RowBlockV2* block, size_t row_offset, size_t nrows);
 
 private:
-    class BitmapRangeIterator {
-       public:
-        explicit BitmapRangeIterator(const Roaring& bitmap) {
-            roaring_init_iterator(&bitmap.roaring, &_iter);
-            _last_val = 0;
-            _buf = new uint32_t[256];
-            _read_next_batch();
-        }
-
-        ~BitmapRangeIterator() {
-            delete[] _buf;
-        }
-
-        bool has_more_range() const { return !_eof; }
-
-        bool next_range(uint32_t max_range_size, uint32_t* from, uint32_t* to) {
-            if (_eof) {
-                return false;
-            }
-            *from = _buf[_buf_pos];
-            uint32_t range_size = 0;
-            do {
-                _last_val = _buf[_buf_pos];
-                _buf_pos++;
-                range_size++;
-                if (_buf_pos == _buf_size) { // read next batch
-                    _read_next_batch();
-                }
-            } while (range_size < max_range_size && !_eof && _buf[_buf_pos] == _last_val + 1);
-            *to = *from + range_size;
-            return true;
-        }
-
-       private:
-        void _read_next_batch() {
-            uint32_t n = roaring_read_uint32_iterator(&_iter, _buf, kBatchSize);
-            _buf_pos = 0;
-            _buf_size = n;
-            _eof = n == 0;
-        }
-
-        static const uint32_t kBatchSize = 256;
-        roaring_uint32_iterator_t _iter;
-        uint32_t _last_val;
-        uint32_t* _buf = nullptr;
-        uint32_t _buf_pos;
-        uint32_t _buf_size;
-        bool _eof;
-    };
+    class BitmapRangeIterator;
 
     std::shared_ptr<Segment> _segment;
     // TODO(zc): rethink if we need copy it
@@ -146,10 +96,24 @@ private:
     std::unique_ptr<BitmapRangeIterator> _range_iter;
     // the next rowid to read
     rowid_t _cur_rowid;
+    // members related to lazy materialization read
+    // --------------------------------------------
+    // whether lazy materialization read should be used.
+    bool _lazy_materialization_read;
+    // columns to read before predicate evaluation
+    std::vector<ColumnId> _predicate_columns;
+    // columns to read after predicate evaluation
+    std::vector<ColumnId> _non_predicate_columns;
+    // remember the rowids we've read for the current row block.
+    // could be a local variable of next_batch(), kept here to reuse vector memory
+    std::vector<rowid_t> _block_rowids;
+
     // the actual init process is delayed to the first call to next_batch()
     bool _inited;
 
     StorageReadOptions _opts;
+    // make a copy of `_opts.column_predicates` in order to make local changes
+    std::vector<ColumnPredicate*> _col_predicates;
 
     // row schema of the key to seek
     // only used in `_get_row_ranges_by_keys`


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org