You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2021/08/26 01:15:00 UTC

[incubator-doris] branch master updated: push down conditions on unique table value columns to base rowset (#6457)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ca3eb64  push down conditions on unique table value columns to base rowset (#6457)
ca3eb64 is described below

commit ca3eb6490e6facc8dc8889d4b917208ba93b0c7d
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Thu Aug 26 09:14:49 2021 +0800

    push down conditions on unique table value columns to base rowset (#6457)
---
 be/src/exec/olap_scanner.cpp                       |  1 +
 be/src/olap/olap_cond.h                            |  3 +-
 be/src/olap/reader.cpp                             | 46 +++++++++++++++-------
 be/src/olap/reader.h                               |  7 ++++
 be/src/olap/rowset/beta_rowset_reader.cpp          | 13 ++++--
 be/src/olap/rowset/rowset_reader_context.h         |  3 ++
 be/src/olap/rowset/segment_v2/segment_iterator.cpp | 16 +++++---
 7 files changed, 65 insertions(+), 24 deletions(-)

diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 073c92b..5f977ed 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -182,6 +182,7 @@ Status OlapScanner::_init_params(
              _params.rs_readers[0]->rowset()->rowset_meta()->num_rows() == 0 &&
              _params.rs_readers[1]->rowset()->start_version() == 2 &&
              !_params.rs_readers[1]->rowset()->rowset_meta()->is_segments_overlapping());
+
     if (_aggregation || single_version) {
         _params.return_columns = _return_columns;
     } else {
diff --git a/be/src/olap/olap_cond.h b/be/src/olap/olap_cond.h
index 54d699e..84e56be 100644
--- a/be/src/olap/olap_cond.h
+++ b/be/src/olap/olap_cond.h
@@ -161,6 +161,7 @@ public:
         }
         _columns.clear();
     }
+    bool empty() const { return _columns.empty(); }
 
     // TODO(yingchun): should do it in constructor
     void set_tablet_schema(const TabletSchema* schema) { _schema = schema; }
@@ -170,7 +171,7 @@ public:
     // 1. column不属于key列
     // 2. column类型是double, float
     OLAPStatus append_condition(const TCondition& condition);
-    
+
     // 通过所有列上的删除条件对RowCursor进行过滤
     // Return true means this row should be filtered out, otherwise return false
     bool delete_conditions_eval(const RowCursor& row) const;
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 4776732..d5ca432 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -17,9 +17,10 @@
 
 #include "olap/reader.h"
 
+#include <parallel_hashmap/phmap.h>
+
 #include <boost/algorithm/string/case_conv.hpp>
 #include <charconv>
-#include <parallel_hashmap/phmap.h>
 #include <unordered_set>
 
 #include "olap/bloom_filter_predicate.h"
@@ -31,8 +32,8 @@
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
 #include "olap/rowset/beta_rowset_reader.h"
-#include "olap/schema.h"
 #include "olap/rowset/column_data.h"
+#include "olap/schema.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "runtime/mem_pool.h"
@@ -303,6 +304,9 @@ OLAPStatus Reader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_pool
 void Reader::close() {
     VLOG_NOTICE << "merged rows:" << _merged_rows;
     _conditions.finalize();
+    if (!_all_conditions.empty()) {
+        _all_conditions.finalize();
+    }
     _delete_handler.finalize();
 
     for (auto pred : _col_predicates) {
@@ -393,7 +397,9 @@ OLAPStatus Reader::_capture_rs_readers(const ReaderParams& read_params,
     _reader_context.return_columns = &_return_columns;
     _reader_context.seek_columns = &_seek_columns;
     _reader_context.load_bf_columns = &_load_bf_columns;
+    _reader_context.load_bf_all_columns = &_load_bf_all_columns;
     _reader_context.conditions = &_conditions;
+    _reader_context.all_conditions = &_all_conditions;
     _reader_context.predicates = &_col_predicates;
     _reader_context.value_predicates = &_value_col_predicates;
     _reader_context.lower_bound_keys = &_keys_param.start_keys;
@@ -563,11 +569,13 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
     std::vector<uint32_t> columns(scan_key_size);
     std::iota(columns.begin(), columns.end(), 0);
 
-    std::shared_ptr<Schema> schema = std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns);
+    std::shared_ptr<Schema> schema =
+            std::make_shared<Schema>(_tablet->tablet_schema().columns(), columns);
 
     for (size_t i = 0; i < start_key_size; ++i) {
         if (read_params.start_key[i].size() != scan_key_size) {
-            OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i, read_params.start_key[i].size(), scan_key_size);
+            OLAP_LOG_WARNING("The start_key.at(%ld).size == %ld, not equals the %ld", i,
+                             read_params.start_key[i].size(), scan_key_size);
             return OLAP_ERR_INPUT_PARAMETER_ERROR;
         }
 
@@ -594,7 +602,8 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
     _keys_param.end_keys.resize(end_key_size, nullptr);
     for (size_t i = 0; i < end_key_size; ++i) {
         if (read_params.end_key[i].size() != scan_key_size) {
-            OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i, read_params.end_key[i].size(), scan_key_size);
+            OLAP_LOG_WARNING("The end_key.at(%ld).size == %ld, not equals the %ld", i,
+                             read_params.end_key[i].size(), scan_key_size);
             return OLAP_ERR_INPUT_PARAMETER_ERROR;
         }
 
@@ -603,8 +612,8 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
             return OLAP_ERR_MALLOC_ERROR;
         }
 
-        OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(_tablet->tablet_schema(),
-                                                                read_params.end_key[i].values(), schema);
+        OLAPStatus res = _keys_param.end_keys[i]->init_scan_key(
+                _tablet->tablet_schema(), read_params.end_key[i].values(), schema);
         if (res != OLAP_SUCCESS) {
             OLAP_LOG_WARNING("fail to init row cursor. [res=%d]", res);
             return res;
@@ -624,6 +633,7 @@ OLAPStatus Reader::_init_keys_param(const ReaderParams& read_params) {
 
 void Reader::_init_conditions_param(const ReaderParams& read_params) {
     _conditions.set_tablet_schema(&_tablet->tablet_schema());
+    _all_conditions.set_tablet_schema(&_tablet->tablet_schema());
     for (const auto& condition : read_params.conditions) {
         ColumnPredicate* predicate = _parse_to_predicate(condition);
         if (predicate != nullptr) {
@@ -636,6 +646,8 @@ void Reader::_init_conditions_param(const ReaderParams& read_params) {
                 OLAPStatus status = _conditions.append_condition(condition);
                 DCHECK_EQ(OLAP_SUCCESS, status);
             }
+            OLAPStatus status = _all_conditions.append_condition(condition);
+            DCHECK_EQ(OLAP_SUCCESS, status);
         }
     }
 
@@ -849,7 +861,8 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
             int128_t value = 0;
             StringParser::ParseResult result;
             for (auto& cond_val : condition.condition_values) {
-                value = StringParser::string_to_int<__int128>(cond_val.c_str(), cond_val.size(), &result);
+                value = StringParser::string_to_int<__int128>(cond_val.c_str(), cond_val.size(),
+                                                              &result);
                 values.insert(value);
             }
             if (condition.condition_op == "*=") {
@@ -947,17 +960,22 @@ ColumnPredicate* Reader::_parse_to_predicate(const TCondition& condition, bool o
     }
     return predicate;
 }
-
 void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
-    // add all columns with condition to _load_bf_columns
-    for (const auto& cond_column : _conditions.columns()) {
+    _init_load_bf_columns(read_params, &_conditions, &_load_bf_columns);
+    _init_load_bf_columns(read_params, &_all_conditions, &_load_bf_all_columns);
+}
+
+void Reader::_init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
+                                   std::set<uint32_t>* load_bf_columns) {
+    // add all columns with condition to load_bf_columns
+    for (const auto& cond_column : conditions->columns()) {
         if (!_tablet->tablet_schema().column(cond_column.first).is_bf_column()) {
             continue;
         }
         for (const auto& cond : cond_column.second->conds()) {
             if (cond->op == OP_EQ ||
                 (cond->op == OP_IN && cond->operand_set.size() < MAX_OP_IN_FIELD_NUM)) {
-                _load_bf_columns.insert(cond_column.first);
+                load_bf_columns->insert(cond_column.first);
             }
         }
     }
@@ -986,7 +1004,7 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
     }
 
     for (int i = 0; i < max_equal_index; ++i) {
-        _load_bf_columns.erase(i);
+        load_bf_columns->erase(i);
     }
 
     // remove the max_equal_index column when it's not varchar
@@ -996,7 +1014,7 @@ void Reader::_init_load_bf_columns(const ReaderParams& read_params) {
     }
     FieldType type = _tablet->tablet_schema().column(max_equal_index).type();
     if ((type != OLAP_FIELD_TYPE_VARCHAR && type != OLAP_FIELD_TYPE_STRING)|| max_equal_index + 1 > _tablet->num_short_key_columns()) {
-        _load_bf_columns.erase(max_equal_index);
+        load_bf_columns->erase(max_equal_index);
     }
 }
 
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 2d62aab..525c2ad 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -162,6 +162,8 @@ private:
     void _init_seek_columns();
 
     void _init_load_bf_columns(const ReaderParams& read_params);
+    void _init_load_bf_columns(const ReaderParams& read_params, Conditions* conditions,
+                               std::set<uint32_t>* load_bf_columns);
 
     // Direcly read row from rowset and pass to upper caller. No need to do aggregation.
     // This is usually used for DUPLICATE KEY tables
@@ -187,6 +189,7 @@ private:
     std::shared_ptr<MemTracker> _tracker;
     std::unique_ptr<MemPool> _predicate_mem_pool;
     std::set<uint32_t> _load_bf_columns;
+    std::set<uint32_t> _load_bf_all_columns;
     std::vector<uint32_t> _return_columns;
     std::vector<uint32_t> _seek_columns;
 
@@ -195,7 +198,11 @@ private:
     KeysParam _keys_param;
     std::vector<bool> _is_lower_keys_included;
     std::vector<bool> _is_upper_keys_included;
+    // contains condition on key columns in agg or unique table or all column in dup tables
     Conditions _conditions;
+    // contains _conditions and condition on value columns, used for push down
+    // conditions to base rowset of unique table
+    Conditions _all_conditions;
     std::vector<ColumnPredicate*> _col_predicates;
     std::vector<ColumnPredicate*> _value_col_predicates;
     DeleteHandler _delete_handler;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index ee13dad..f50eb87 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -79,11 +79,16 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
     }
     // if unique table with rowset [0-x] or [0-1] [2-y] [...],
     // value column predicates can be pushdown on rowset [0-x] or [2-y]
-    if (read_context->value_predicates != nullptr && _rowset->keys_type() == UNIQUE_KEYS &&
+    if (_rowset->keys_type() == UNIQUE_KEYS &&
         (_rowset->start_version() == 0 || _rowset->start_version() == 2)) {
-        read_options.column_predicates.insert(read_options.column_predicates.end(),
-                                              read_context->value_predicates->begin(),
-                                              read_context->value_predicates->end());
+        if (read_context->value_predicates != nullptr) {
+            read_options.column_predicates.insert(read_options.column_predicates.end(),
+                                                  read_context->value_predicates->begin(),
+                                                  read_context->value_predicates->end());
+        }
+        if (read_context->all_conditions != nullptr && !read_context->all_conditions->empty()) {
+            read_options.conditions = read_context->all_conditions;
+        }
     }
     read_options.use_page_cache = read_context->use_page_cache;
 
diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h
index 9d757dd..d1ef819 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -42,8 +42,11 @@ struct RowsetReaderContext {
     // columns to load bloom filter index
     // including columns in "=" or "in" conditions
     const std::set<uint32_t>* load_bf_columns = nullptr;
+    const std::set<uint32_t>* load_bf_all_columns = nullptr;
     // column filter conditions by delete sql
     const Conditions* conditions = nullptr;
+    // value column predicate in UNIQUE table
+    const Conditions* all_conditions = nullptr;
     // column name -> column predicate
     // adding column_name for predicate to make use of column selectivity
     const std::vector<ColumnPredicate*>* predicates = nullptr;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index f39eaa0..5e80270 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -90,7 +90,8 @@ private:
     bool _eof;
 };
 
-SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema& schema, std::shared_ptr<MemTracker> parent)
+SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, const Schema& schema,
+                                 std::shared_ptr<MemTracker> parent)
         : _segment(std::move(segment)),
           _schema(schema),
           _column_iterators(_schema.num_columns(), nullptr),
@@ -194,11 +195,13 @@ Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_ra
     // create used column iterator
     for (auto cid : _seek_schema->column_ids()) {
         if (_column_iterators[cid] == nullptr) {
-            RETURN_IF_ERROR(_segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid]));
+            RETURN_IF_ERROR(
+                    _segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid]));
             ColumnIteratorOptions iter_opts;
             iter_opts.stats = _opts.stats;
             iter_opts.rblock = _rblock.get();
-            iter_opts.mem_tracker = MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false);
+            iter_opts.mem_tracker =
+                    MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false);
             RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
         }
     }
@@ -233,6 +236,7 @@ Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row
             cids.insert(column_condition.first);
         }
     }
+
     // first filter data by bloom filter index
     // bloom filter index only use CondColumn
     RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
@@ -322,12 +326,14 @@ Status SegmentIterator::_init_return_column_iterators() {
     }
     for (auto cid : _schema.column_ids()) {
         if (_column_iterators[cid] == nullptr) {
-            RETURN_IF_ERROR(_segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid]));
+            RETURN_IF_ERROR(
+                    _segment->new_column_iterator(cid, _mem_tracker, &_column_iterators[cid]));
             ColumnIteratorOptions iter_opts;
             iter_opts.stats = _opts.stats;
             iter_opts.use_page_cache = _opts.use_page_cache;
             iter_opts.rblock = _rblock.get();
-            iter_opts.mem_tracker = MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false);
+            iter_opts.mem_tracker =
+                    MemTracker::CreateTracker(-1, "ColumnIterator", _mem_tracker, false);
             RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
         }
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org