You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/27 02:33:11 UTC
[incubator-doris] branch master updated: [Refactor] refactor
olap_scan_node: discard boost, remove dynamic_cast (#6622)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 850cf10 [Refactor] refactor olap_scan_node: discard boost, remove dynamic_cast (#6622)
850cf10 is described below
commit 850cf1099189b760347c83a31b3dc85f6eee1cbc
Author: thinker <zc...@qq.com>
AuthorDate: Mon Sep 27 10:32:57 2021 +0800
[Refactor] refactor olap_scan_node: discard boost, remove dynamic_cast (#6622)
1. refactor olap_scan_node: discard boost, remove dynamic_cast
2. use move instead of copy version for push_back
---
be/src/exec/olap_common.h | 4 ++--
be/src/exec/olap_scan_node.cpp | 41 +++++++++++++--------------------
be/src/exec/olap_scan_node.h | 51 +++++-------------------------------------
3 files changed, 22 insertions(+), 74 deletions(-)
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 94ecd1d..7ad3d5e 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -19,9 +19,9 @@
#define DORIS_BE_SRC_QUERY_EXEC_OLAP_COMMON_H
#include <stdint.h>
+#include <variant>
#include <boost/lexical_cast.hpp>
-#include <boost/variant.hpp>
#include <map>
#include <sstream>
#include <string>
@@ -317,7 +317,7 @@ private:
bool _is_convertible;
};
-typedef boost::variant<
+typedef std::variant<
ColumnValueRange<int8_t>, ColumnValueRange<int16_t>, ColumnValueRange<int32_t>,
ColumnValueRange<int64_t>, ColumnValueRange<__int128>, ColumnValueRange<StringValue>,
ColumnValueRange<DateTimeValue>, ColumnValueRange<DecimalV2Value>, ColumnValueRange<bool>>
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 93d7947..6378fa2 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -18,7 +18,6 @@
#include "exec/olap_scan_node.h"
#include <algorithm>
-#include <boost/variant.hpp>
#include <iostream>
#include <string>
#include <utility>
@@ -50,7 +49,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
_tuple_desc(NULL),
_tuple_idx(0),
_eos(false),
- _scanner_pool(new ObjectPool()),
_max_materialized_row_batches(config::doris_scanner_queue_size),
_start(false),
_scanner_done(false),
@@ -186,11 +184,9 @@ Status OlapScanNode::prepare(RuntimeState* state) {
_collection_slots.push_back(slots[i]);
}
- if (!slots[i]->type().is_string_type()) {
- continue;
+ if (slots[i]->type().is_string_type()) {
+ _string_slots.push_back(slots[i]);
}
-
- _string_slots.push_back(slots[i]);
}
_runtime_state = state;
@@ -297,7 +293,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
}
if (!_materialized_row_batches.empty()) {
- materialized_batch = dynamic_cast<RowBatch*>(_materialized_row_batches.front());
+ materialized_batch = _materialized_row_batches.front();
DCHECK(materialized_batch != NULL);
_materialized_row_batches.pop_front();
}
@@ -377,7 +373,7 @@ Status OlapScanNode::close(RuntimeState* state) {
_scan_batch_added_cv.notify_all();
// join transfer thread
- _transfer_thread.join_all();
+ _transfer_thread->join();
// clear some row batch in queue
for (auto row_batch : _materialized_row_batches) {
@@ -621,17 +617,11 @@ Status OlapScanNode::normalize_conjuncts() {
Status OlapScanNode::build_olap_filters() {
for (auto& iter : _column_value_ranges) {
- ToOlapFilterVisitor visitor;
- boost::variant<std::vector<TCondition>> filters;
- boost::apply_visitor(visitor, iter.second, filters);
-
- std::vector<TCondition> new_filters = boost::get<std::vector<TCondition>>(filters);
- if (new_filters.empty()) {
- continue;
- }
+ std::vector<TCondition> filters;
+ std::visit([&](auto &&range) { range.to_olap_filter(filters); }, iter.second);
- for (const auto& filter : new_filters) {
- _olap_filter.push_back(filter);
+ for (const auto& filter : filters) {
+ _olap_filter.push_back(std::move(filter));
}
}
@@ -648,13 +638,12 @@ Status OlapScanNode::build_scan_key() {
for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value();
++column_index) {
- auto column_range_iter = _column_value_ranges.find(column_names[column_index]);
- if (_column_value_ranges.end() == column_range_iter) {
+ auto iter = _column_value_ranges.find(column_names[column_index]);
+ if (_column_value_ranges.end() == iter) {
break;
}
- ExtendScanKeyVisitor visitor(_scan_keys, _max_scan_key_num);
- RETURN_IF_ERROR(boost::apply_visitor(visitor, column_range_iter->second));
+ RETURN_IF_ERROR(std::visit([&](auto &&range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); }, iter->second));
}
VLOG_CRITICAL << _scan_keys.debug_string();
@@ -790,7 +779,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
_need_agg_finalize, *scan_range, scanner_ranges);
// add scanner to pool before doing prepare.
// so that scanner can be automatically deconstructed if prepare failed.
- _scanner_pool->add(scanner);
+ _scanner_pool.add(scanner);
RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
_bloom_filters_push_down));
@@ -807,7 +796,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
ss << "ScanThread complete (node=" << id() << "):";
_progress = ProgressUpdater(ss.str(), _olap_scanners.size(), 1);
- _transfer_thread.add_thread(new boost::thread(&OlapScanNode::transfer_thread, this, state));
+ _transfer_thread = std::make_shared<std::thread>(&OlapScanNode::transfer_thread, this, state);
return Status::OK();
}
@@ -1443,7 +1432,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
}
}
- RowBatchInterface* scan_batch = NULL;
+ RowBatch* scan_batch = NULL;
{
// 1 scanner idle task not empty, assign new scanner task
std::unique_lock<std::mutex> l(_scan_batches_lock);
@@ -1658,7 +1647,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
_scan_thread_exit_cv.notify_one();
}
-Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) {
+Status OlapScanNode::add_one_batch(RowBatch* row_batch) {
{
std::unique_lock<std::mutex> l(_row_batches_lock);
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index c7142a5..983f051 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -19,8 +19,6 @@
#define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H
#include <atomic>
-#include <boost/thread.hpp>
-#include <boost/variant/static_visitor.hpp>
#include <condition_variable>
#include <queue>
@@ -66,45 +64,6 @@ protected:
Tuple* tuple;
int id;
} HeapType;
- class IsFixedValueRangeVisitor : public boost::static_visitor<bool> {
- public:
- template <class T>
- bool operator()(T& v) const {
- return v.is_fixed_value_range();
- }
- };
-
- class GetFixedValueSizeVisitor : public boost::static_visitor<size_t> {
- public:
- template <class T>
- size_t operator()(T& v) const {
- return v.get_fixed_value_size();
- }
- };
-
- class ExtendScanKeyVisitor : public boost::static_visitor<Status> {
- public:
- ExtendScanKeyVisitor(OlapScanKeys& scan_keys, int32_t max_scan_key_num)
- : _scan_keys(scan_keys), _max_scan_key_num(max_scan_key_num) {}
- template <class T>
- Status operator()(T& v) {
- return _scan_keys.extend_scan_key(v, _max_scan_key_num);
- }
-
- private:
- OlapScanKeys& _scan_keys;
- int32_t _max_scan_key_num;
- };
-
- typedef boost::variant<std::list<std::string>> string_list;
-
- class ToOlapFilterVisitor : public boost::static_visitor<void> {
- public:
- template <class T, class P>
- void operator()(T& v, P& v2) const {
- v.to_olap_filter(v2);
- }
- };
class MergeComparison {
public:
@@ -172,7 +131,7 @@ protected:
void transfer_thread(RuntimeState* state);
void scanner_thread(OlapScanner* scanner);
- Status add_one_batch(RowBatchInterface* row_batch);
+ Status add_one_batch(RowBatch* row_batch);
// Write debug string of this into out.
virtual void debug_string(int indentation_level, std::stringstream* out) const;
@@ -236,9 +195,9 @@ protected:
// Pool for storing allocated scanner objects. We don't want to use the
// runtime pool to ensure that the scanner objects are deleted before this
// object is.
- std::unique_ptr<ObjectPool> _scanner_pool;
+ ObjectPool _scanner_pool;
- boost::thread_group _transfer_thread;
+ std::shared_ptr<std::thread> _transfer_thread;
// Keeps track of total splits and the number finished.
ProgressUpdater _progress;
@@ -253,14 +212,14 @@ protected:
std::condition_variable _row_batch_added_cv;
std::condition_variable _row_batch_consumed_cv;
- std::list<RowBatchInterface*> _materialized_row_batches;
+ std::list<RowBatch*> _materialized_row_batches;
std::mutex _scan_batches_lock;
std::condition_variable _scan_batch_added_cv;
int64_t _running_thread = 0;
std::condition_variable _scan_thread_exit_cv;
- std::list<RowBatchInterface*> _scan_row_batches;
+ std::list<RowBatch*> _scan_row_batches;
std::list<OlapScanner*> _olap_scanners;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org