You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:03 UTC
[incubator-doris] 08/22: [Improvement] optimize scannode concurrency query performance in vectorized engine. (#9792)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit d40fcdc58d2be1031534c669aa61f8a96379ca58
Author: yiguolei <67...@qq.com>
AuthorDate: Mon May 30 16:04:40 2022 +0800
[Improvement] optimize scannode concurrency query performance in vectorized engine. (#9792)
---
be/src/common/config.h | 4 ++++
be/src/exec/olap_scan_node.cpp | 14 ++++++++----
be/src/exec/olap_scan_node.h | 9 ++++++--
be/src/exec/olap_scanner.cpp | 1 -
be/src/olap/bloom_filter_predicate.h | 17 +++++++++++++-
be/src/vec/exec/volap_scan_node.cpp | 44 ++++++++++++++++++++++++++++--------
6 files changed, 70 insertions(+), 19 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 48582beabe..90faf93006 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -718,6 +718,10 @@ CONF_Int32(object_pool_buffer_size, "100");
// ParquetReaderWrap prefetch buffer size
CONF_Int32(parquet_reader_max_buffer_size, "50");
+// When the rows number reached this limit, will check the filter rate the of bloomfilter
+// if it is lower than a specific threshold, the predicate will be disabled.
+CONF_mInt32(bloom_filter_predicate_check_row_num, "1000");
+
} // namespace config
} // namespace doris
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 0d40d40b3a..7ea7ca3510 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -175,6 +175,8 @@ Status OlapScanNode::prepare(RuntimeState* state) {
// create scanner profile
// create timer
_tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
+ _scanner_sched_counter = ADD_COUNTER(runtime_profile(), "ScannerSchedCount ", TUnit::UNIT);
+
_rows_pushed_cond_filtered_counter =
ADD_COUNTER(_scanner_profile, "RowsPushedCondFiltered", TUnit::UNIT);
_init_counter(state);
@@ -679,11 +681,11 @@ Status OlapScanNode::build_scan_key() {
return Status::OK();
}
-static Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
- int block_row_count, bool is_begin_include, bool is_end_include,
- const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
- std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
- RuntimeProfile* profile) {
+Status OlapScanNode::get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range,
+ int block_row_count, bool is_begin_include, bool is_end_include,
+ const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
+ std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
+ RuntimeProfile* profile) {
RuntimeProfile::Counter* show_hints_timer = profile->get_counter("ShowHintsTime_V1");
std::vector<std::vector<OlapTuple>> ranges;
bool have_valid_range = false;
@@ -1439,6 +1441,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
std::bind(&OlapScanNode::scanner_thread, this, *iter));
if (s.ok()) {
(*iter)->start_wait_worker_timer();
+ COUNTER_UPDATE(_scanner_sched_counter, 1);
olap_scanners.erase(iter++);
} else {
LOG(FATAL) << "Failed to assign scanner task to thread pool! "
@@ -1453,6 +1456,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
task.priority = _nice;
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();
+ COUNTER_UPDATE(_scanner_sched_counter, 1);
if (thread_pool->offer(task)) {
olap_scanners.erase(iter++);
} else {
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index 1220a742e0..4cd2902b93 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -57,7 +57,12 @@ public:
Status collect_query_statistics(QueryStatistics* statistics) override;
Status close(RuntimeState* state) override;
Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override;
- inline void set_no_agg_finalize() { _need_agg_finalize = false; }
+ void set_no_agg_finalize() { _need_agg_finalize = false; }
+ Status get_hints(TabletSharedPtr table, const TPaloScanRange& scan_range, int block_row_count,
+ bool is_begin_include, bool is_end_include,
+ const std::vector<std::unique_ptr<OlapScanRange>>& scan_key_range,
+ std::vector<std::unique_ptr<OlapScanRange>>* sub_scan_range,
+ RuntimeProfile* profile);
protected:
struct HeapType {
@@ -246,7 +251,7 @@ protected:
RuntimeProfile::Counter* _tablet_counter;
RuntimeProfile::Counter* _rows_pushed_cond_filtered_counter = nullptr;
RuntimeProfile::Counter* _reader_init_timer = nullptr;
-
+ RuntimeProfile::Counter* _scanner_sched_counter = nullptr;
TResourceInfo* _resource_info;
std::atomic<int64_t> _buffered_bytes;
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 9baef00001..4e2003ae0b 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -308,7 +308,6 @@ Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
if (UNLIKELY(*eof)) {
break;
}
-
_num_rows_read++;
_convert_row_to_tuple(tuple);
diff --git a/be/src/olap/bloom_filter_predicate.h b/be/src/olap/bloom_filter_predicate.h
index c8fbdab94c..a7671f2724 100644
--- a/be/src/olap/bloom_filter_predicate.h
+++ b/be/src/olap/bloom_filter_predicate.h
@@ -71,6 +71,9 @@ public:
private:
std::shared_ptr<IBloomFilterFuncBase> _filter;
SpecificFilter* _specific_filter; // owned by _filter
+ mutable uint64_t _evaluated_rows = 1;
+ mutable uint64_t _passed_rows = 0;
+ mutable bool _enable_pred = true;
};
// bloom filter column predicate do not support in segment v1
@@ -113,7 +116,9 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
uint16_t* size) const {
uint16_t new_size = 0;
using FT = typename PredicatePrimitiveTypeTraits<T>::PredicateFieldType;
-
+ if (!_enable_pred) {
+ return;
+ }
if (column.is_nullable()) {
auto* nullable_col = vectorized::check_and_get_column<vectorized::ColumnNullable>(column);
auto& null_map_data = nullable_col->get_null_map_column().get_data();
@@ -158,6 +163,16 @@ void BloomFilterColumnPredicate<T>::evaluate(vectorized::IColumn& column, uint16
new_size += _specific_filter->find_olap_engine(cell_value);
}
}
+ // If the pass rate is very high, for example > 50%, then the bloomfilter is useless.
+ // Some bloomfilter is useless, for example ssb 4.3, it consumes a lot of cpu but it is
+ // useless.
+ _evaluated_rows += *size;
+ _passed_rows += new_size;
+ if (_evaluated_rows > config::bloom_filter_predicate_check_row_num) {
+ if (_passed_rows / (_evaluated_rows * 1.0) > 0.5) {
+ _enable_pred = false;
+ }
+ }
*size = new_size;
}
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index 3156324d66..c54312d16d 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -196,9 +196,13 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
int64_t raw_bytes_read = 0;
int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
bool get_free_block = true;
+ int num_rows_in_block = 0;
- while (!eos && raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold &&
- get_free_block) {
+ // Has to wait at least one full block, or it will cause a lot of schedule task in priority
+ // queue, it will affect query latency and query concurrency for example ssb 3.3.
+ while (!eos && ((raw_rows_read < raw_rows_threshold && raw_bytes_read < raw_bytes_threshold &&
+ get_free_block) ||
+ num_rows_in_block < _runtime_state->batch_size())) {
if (UNLIKELY(_transfer_done)) {
eos = true;
status = Status::Cancelled("Cancelled");
@@ -218,7 +222,7 @@ void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
}
raw_bytes_read += block->allocated_bytes();
-
+ num_rows_in_block += block->rows();
// 4. if status not ok, change status_.
if (UNLIKELY(block->rows() == 0)) {
std::lock_guard<std::mutex> l(_free_blocks_lock);
@@ -324,6 +328,12 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
if (cond_ranges.empty()) {
cond_ranges.emplace_back(new OlapScanRange());
}
+ bool need_split = true;
+ // If we have ranges more than 64, there is no need to call
+ // ShowHint to split ranges
+ if (limit() != -1 || cond_ranges.size() > 64) {
+ need_split = false;
+ }
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
std::unordered_set<std::string> disk_set;
@@ -341,6 +351,16 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
return Status::InternalError(ss.str());
}
+ std::vector<std::unique_ptr<OlapScanRange>>* ranges = &cond_ranges;
+ std::vector<std::unique_ptr<OlapScanRange>> split_ranges;
+ if (need_split && !tablet->all_beta()) {
+ auto st = get_hints(tablet, *scan_range, config::doris_scan_range_row_count,
+ _scan_keys.begin_include(), _scan_keys.end_include(), cond_ranges,
+ &split_ranges, _runtime_profile.get());
+ if (st.ok()) {
+ ranges = &split_ranges;
+ }
+ }
int size_based_scanners_per_tablet = 1;
if (config::doris_scan_range_max_mb > 0) {
@@ -349,17 +369,17 @@ Status VOlapScanNode::start_scan_thread(RuntimeState* state) {
}
int ranges_per_scanner =
- std::max(1, (int)cond_ranges.size() /
+ std::max(1, (int)ranges->size() /
std::min(scanners_per_tablet, size_based_scanners_per_tablet));
- int num_ranges = cond_ranges.size();
+ int num_ranges = ranges->size();
for (int i = 0; i < num_ranges;) {
std::vector<OlapScanRange*> scanner_ranges;
- scanner_ranges.push_back(cond_ranges[i].get());
+ scanner_ranges.push_back((*ranges)[i].get());
++i;
for (int j = 1; i < num_ranges && j < ranges_per_scanner &&
- cond_ranges[i]->end_include == cond_ranges[i - 1]->end_include;
+ (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
++j, ++i) {
- scanner_ranges.push_back(cond_ranges[i].get());
+ scanner_ranges.push_back((*ranges)[i].get());
}
VOlapScanner* scanner = new VOlapScanner(state, this, _olap_scan_node.is_preaggregation,
_need_agg_finalize, *scan_range);
@@ -551,8 +571,11 @@ Block* VOlapScanNode::_alloc_block(bool& get_free_block) {
int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per_scanner) {
std::list<VOlapScanner*> olap_scanners;
int assigned_thread_num = _running_thread;
- size_t max_thread = std::min(_volap_scanners.size(),
- static_cast<size_t>(config::doris_scanner_thread_pool_thread_num));
+ size_t max_thread = config::doris_scanner_queue_size;
+ if (config::doris_scanner_row_num > state->batch_size()) {
+ max_thread /= config::doris_scanner_row_num / state->batch_size();
+ if (max_thread <= 0) max_thread = 1;
+ }
// copy to local
{
// How many thread can apply to this query
@@ -606,6 +629,7 @@ int VOlapScanNode::_start_scanner_thread_task(RuntimeState* state, int block_per
task.priority = _nice;
task.queue_id = state->exec_env()->store_path_to_index((*iter)->scan_disk());
(*iter)->start_wait_worker_timer();
+ COUNTER_UPDATE(_scanner_sched_counter, 1);
if (thread_pool->offer(task)) {
olap_scanners.erase(iter++);
} else {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org