You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:03 UTC

[incubator-doris] 08/22: [Improvement] optimize scannode concurrency query performance in vectorized engine. (#9792)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit d40fcdc58d2be1031534c669aa61f8a96379ca58
Author: yiguolei <67...@qq.com>
AuthorDate: Mon May 30 16:04:40 2022 +0800

    [Improvement] optimize  scannode concurrency query performance in vectorized engine. (#9792)
---
 be/src/common/config.h               |  4 ++++
 be/src/exec/olap_scan_node.cpp       | 14 ++++++++----
 be/src/exec/olap_scan_node.h         |  9 ++++++--
 be/src/exec/olap_scanner.cpp         |  1 -
 be/src/olap/bloom_filter_predicate.h | 17 +++++++++++++-
 be/src/vec/exec/volap_scan_node.cpp  | 44 ++++++++++++++++++++++++++++--------
 6 files changed, 70 insertions(+), 19 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 48582beabe..90faf93006 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -718,6 +718,10 @@ CONF_Int32(object_pool_buffer_size, "100");
 // ParquetReaderWrap prefetch buffer size
 CONF_Int32(parquet_reader_max_buffer_size, "50");
 
+// When the rows number reached this limit, will check the filter rate the of bloomfilter
+// if it is lower than a specific threshold, the predicate will be disabled.
+CONF_mInt32(bloom_filter_predicate_check_row_num, "1000");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 0d40d40b3a..7ea7ca3510 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -175,6 +175,8 @@ Status OlapScanNode::prepare(RuntimeState* state) {
     // create scanner profile
     // create timer
     _tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
+    _scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT);
+
     _rows_pushed_cond_filtered_counter =
             ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT);
     _init_counter(state);
@@ -679,11 +681,11 @@ Status OlapScanNode::build_scan_key() {
     return Status::OK();
 }
 
-static Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
-                        int block_row_count, bool is_begin_include, bool is_end_include,
-                        const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
-                        std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
-                        RuntimeProfile* profile) {
+Status OlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
+                               int block_row_count, bool is_begin_include, bool is_end_include,
+                               const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
+                               std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
+                               RuntimeProfile* profile) {
     RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1");
     std::vector<std::vector<OlapTuple>> ranges;
     bool have_valid_range = false;
@@ -1439,6 +1441,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
                         std::bind(&OlapScanNode::scanner_thread, this, *iter));
                 if (s.ok()) {
                     (*iter)->start_wait_worker_timer();
+                    COUNTER_UPDATE(_scanner_sched_counter, 1);
                     olap_scanners.erase(iter++);
                 } else {
                     LOG(FATAL) << "Failed to assign scanner task to thread pool! "
@@ -1453,6 +1456,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
                 task.priority = _nice;
                 task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
                 (*iter)->start_wait_worker_timer();
+                COUNTER_UPDATE(_scanner_sched_counter, 1);
                 if (thread_pool->offer(task)) {
                     olap_scanners.erase(iter++);
                 } else {
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 1220a742e0..4cd2902b93 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -57,7 +57,12 @@ public:
     Status collect_query_statistics(QueryStatistics* statistics) override;
     Status close(RuntimeState* state) override;
     Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
-    inline void set_no_agg_finalize() { _need_agg_finalize = false; }
+    void set_no_agg_finalize() { _need_agg_finalize = false; }
+    Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, int block_row_count,
+                     bool is_begin_include, bool is_end_include,
+                     const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
+                     std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
+                     RuntimeProfile* profile);
 
 protected:
     struct HeapType {
@@ -246,7 +251,7 @@ protected:
     RuntimeProfile::Counter* _tablet_counter;
     RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr;
     RuntimeProfile::Counter* _reader_init_timer = nullptr;
-
+    RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
     TResourceInfo* _resource_info;
 
     std::atomic<int64_t> _buffered_bytes;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 9baef00001..4e2003ae0b 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -308,7 +308,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
             if (UNLIKELY(*eof)) {
                 break;
             }
-
             _num_rows_read++;
 
             _convert_row_to_tuple(tuple);
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index c8fbdab94c..a7671f2724 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -71,6 +71,9 @@ public:
 private:
     std::shared_ptr<IBloomFilterFuncBase> _filter;
     SpecificFilter* _specific_filter; // owned by _filter
+    mutable uint64_t _evaluated_rows = 1;
+    mutable uint64_t _passed_rows = 0;
+    mutable bool _enable_pred = true;
 };
 
 // bloom filter column predicate do not support in segment v1
@@ -113,7 +116,9 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
                                                 uint16_t* size) const {
     uint16_t new_size = 0;
     using FT = typename PredicatePrimitiveTypeTraits<T>::PredicateFieldType;
-
+    if (!_enable_pred) {
+        return;
+    }
     if (column.is_nullable()) {
         auto* nullable_col = vectorized::check_and_get_column<vectorized::ColumnNullable>(column);
         auto& null_map_data = nullable_col->get_null_map_column().get_data();
@@ -158,6 +163,16 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
             new_size += _specific_filter->find_olap_engine(cell_value);
         }
     }
+    // If the pass rate is very high, for example > 50%, then the bloomfilter is useless.
+    // Some bloomfilter is useless, for example ssb 4.3, it consumes a lot of cpu but it is
+    // useless.
+    _evaluated_rows += *size;
+    _passed_rows += new_size;
+    if (_evaluated_rows > config::bloom_filter_predicate_check_row_num) {
+        if (_passed_rows / (_evaluated_rows * 1.0) > 0.5) {
+            _enable_pred = false;
+        }
+    }
     *size = new_size;
 }
 
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 3156324d66..c54312d16d 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -196,9 +196,13 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     int64_t raw_bytes_read = 0;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
     bool get_free_block = true;
+    int num_rows_in_block = 0;
 
-    while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold &&
-           get_free_block) {
+    // Has to wait at least one full block, or it will cause a lot of schedule task in priority
+    // queue, it will affect query latency and query concurrency for example ssb 3.3.
+    while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold &&
+                     get_free_block) ||
+                    num_rows_in_block < _runtime_state->batch_size())) {
         if (UNLIKELY(_transfer_done)) {
             eos = true;
             status = Status::Cancelled("Cancelled");
@@ -218,7 +222,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
         }
 
         raw_bytes_read += block->allocated_bytes();
-
+        num_rows_in_block += block->rows();
         // 4. if status not ok, change status_.
         if (UNLIKELY(block->rows() == 0)) {
             std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -324,6 +328,12 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
     if (cond_ranges.empty()) {
         cond_ranges.emplace_back(new OlapScanRange());
     }
+    bool need_split = true;
+    // If we have ranges more than 64, there is no need to call
+    // ShowHint to split ranges
+    if (limit() != -1 || cond_ranges.size() > 64) {
+        need_split = false;
+    }
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
     std::unordered_set<std::string> disk_set;
@@ -341,6 +351,16 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
             return Status::InternalError(ss.str());
         }
 
+        std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
+        std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
+        if (need_split && !tablet->all_beta()) {
+            auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
+                                _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
+                                &split_ranges, _runtime_profile.get());
+            if (st.ok()) {
+                ranges = &split_ranges;
+            }
+        }
         int size_based_scanners_per_tablet = 1;
 
         if (config::doris_scan_range_max_mb > 0) {
@@ -349,17 +369,17 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
         }
 
         int ranges_per_scanner =
-                std::max(1, (int)cond_ranges.size() /
+                std::max(1, (int)ranges->size() /
                                     std::min(scanners_per_tablet, size_based_scanners_per_tablet));
-        int num_ranges = cond_ranges.size();
+        int num_ranges = ranges->size();
         for (int i = 0; i < num_ranges;) {
             std::vector<OlapScanRange*> scanner_ranges;
-            scanner_ranges.push_back(cond_ranges[i].get());
+            scanner_ranges.push_back((*ranges)[i].get());
             ++i;
             for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
-                            cond_ranges[i]->end_include == cond_ranges[i - 1]->end_include;
+                            (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
                  ++j, ++i) {
-                scanner_ranges.push_back(cond_ranges[i].get());
+                scanner_ranges.push_back((*ranges)[i].get());
             }
             VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
                                                      _need_agg_finalize, *scan_range);
@@ -551,8 +571,11 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
 int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
     std::list<VOlapScanner*> olap_scanners;
     int assigned_thread_num = _running_thread;
-    size_t max_thread = std::min(_volap_scanners.size(),
-                                 static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
+    size_t max_thread = config::doris_scanner_queue_size;
+    if (config::doris_scanner_row_num > state->batch_size()) {
+        max_thread /= config::doris_scanner_row_num / state->batch_size();
+        if (max_thread <= 0) max_thread = 1;
+    }
     // copy to local
     {
         // How many thread can apply to this query
@@ -606,6 +629,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
         task.priority = _nice;
         task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
         (*iter)->start_wait_worker_timer();
+        COUNTER_UPDATE(_scanner_sched_counter, 1);
         if (thread_pool->offer(task)) {
             olap_scanners.erase(iter++);
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org