You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/27 02:33:11 UTC

[incubator-doris] branch master updated: [Refactor] refactor olap_scan_node: discard boost, remove dynamic_cast (#6622)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 850cf10  [Refactor] refactor olap_scan_node: discard boost, remove dynamic_cast (#6622)
850cf10 is described below

commit 850cf1099189b760347c83a31b3dc85f6eee1cbc
Author: thinker <zc...@qq.com>
AuthorDate: Mon Sep 27 10:32:57 2021 +0800

    [Refactor] refactor olap_scan_node: discard boost, remove dynamic_cast (#6622)
    
    1. refactor olap_scan_node: discard boost, remove dynamic_cast
    2. use move instead of copy version for push_back
---
 be/src/exec/olap_common.h      |  4 ++--
 be/src/exec/olap_scan_node.cpp | 41 +++++++++++++--------------------
 be/src/exec/olap_scan_node.h   | 51 +++++-------------------------------------
 3 files changed, 22 insertions(+), 74 deletions(-)

diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 94ecd1d..7ad3d5e 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -19,9 +19,9 @@
 #define DORIS_BE_SRC_QUERY_EXEC_OLAP_COMMON_H
 
 #include <stdint.h>
+#include <variant>
 
 #include <boost/lexical_cast.hpp>
-#include <boost/variant.hpp>
 #include <map>
 #include <sstream>
 #include <string>
@@ -317,7 +317,7 @@ private:
     bool _is_convertible;
 };
 
-typedef boost::variant<
+typedef std::variant<
         ColumnValueRange<int8_t>, ColumnValueRange<int16_t>, ColumnValueRange<int32_t>,
         ColumnValueRange<int64_t>, ColumnValueRange<__int128>, ColumnValueRange<StringValue>,
         ColumnValueRange<DateTimeValue>, ColumnValueRange<DecimalV2Value>, ColumnValueRange<bool>>
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 93d7947..6378fa2 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -18,7 +18,6 @@
 #include "exec/olap_scan_node.h"
 
 #include <algorithm>
-#include <boost/variant.hpp>
 #include <iostream>
 #include <string>
 #include <utility>
@@ -50,7 +49,6 @@ OlapScanNode::OlapScanNode(ObjectPool* pool, const TPlanNode& tnode, const Descr
           _tuple_desc(NULL),
           _tuple_idx(0),
           _eos(false),
-          _scanner_pool(new ObjectPool()),
           _max_materialized_row_batches(config::doris_scanner_queue_size),
           _start(false),
           _scanner_done(false),
@@ -186,11 +184,9 @@ Status OlapScanNode::prepare(RuntimeState* state) {
             _collection_slots.push_back(slots[i]);
         }
 
-        if (!slots[i]->type().is_string_type()) {
-            continue;
+        if (slots[i]->type().is_string_type()) {
+            _string_slots.push_back(slots[i]);
         }
-
-        _string_slots.push_back(slots[i]);
     }
 
     _runtime_state = state;
@@ -297,7 +293,7 @@ Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
         }
 
         if (!_materialized_row_batches.empty()) {
-            materialized_batch = dynamic_cast<RowBatch*>(_materialized_row_batches.front());
+            materialized_batch = _materialized_row_batches.front();
             DCHECK(materialized_batch != NULL);
             _materialized_row_batches.pop_front();
         }
@@ -377,7 +373,7 @@ Status OlapScanNode::close(RuntimeState* state) {
     _scan_batch_added_cv.notify_all();
 
     // join transfer thread
-    _transfer_thread.join_all();
+    _transfer_thread->join();
 
     // clear some row batch in queue
     for (auto row_batch : _materialized_row_batches) {
@@ -621,17 +617,11 @@ Status OlapScanNode::normalize_conjuncts() {
 
 Status OlapScanNode::build_olap_filters() {
     for (auto& iter : _column_value_ranges) {
-        ToOlapFilterVisitor visitor;
-        boost::variant<std::vector<TCondition>> filters;
-        boost::apply_visitor(visitor, iter.second, filters);
-
-        std::vector<TCondition> new_filters = boost::get<std::vector<TCondition>>(filters);
-        if (new_filters.empty()) {
-            continue;
-        }
+        std::vector<TCondition> filters;
+        std::visit([&](auto &&range) { range.to_olap_filter(filters); }, iter.second);
 
-        for (const auto& filter : new_filters) {
-            _olap_filter.push_back(filter);
+        for (const auto& filter : filters) {
+            _olap_filter.push_back(std::move(filter));
         }
     }
 
@@ -648,13 +638,12 @@ Status OlapScanNode::build_scan_key() {
 
     for (int column_index = 0; column_index < column_names.size() && !_scan_keys.has_range_value();
          ++column_index) {
-        auto column_range_iter = _column_value_ranges.find(column_names[column_index]);
-        if (_column_value_ranges.end() == column_range_iter) {
+        auto iter = _column_value_ranges.find(column_names[column_index]);
+        if (_column_value_ranges.end() == iter) {
             break;
         }
 
-        ExtendScanKeyVisitor visitor(_scan_keys, _max_scan_key_num);
-        RETURN_IF_ERROR(boost::apply_visitor(visitor, column_range_iter->second));
+        RETURN_IF_ERROR(std::visit([&](auto &&range) { return _scan_keys.extend_scan_key(range, _max_scan_key_num); }, iter->second));
     }
 
     VLOG_CRITICAL << _scan_keys.debug_string();
@@ -790,7 +779,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
                                                    _need_agg_finalize, *scan_range, scanner_ranges);
             // add scanner to pool before doing prepare.
             // so that scanner can be automatically deconstructed if prepare failed.
-            _scanner_pool->add(scanner);
+            _scanner_pool.add(scanner);
             RETURN_IF_ERROR(scanner->prepare(*scan_range, scanner_ranges, _olap_filter,
                                              _bloom_filters_push_down));
 
@@ -807,7 +796,7 @@ Status OlapScanNode::start_scan_thread(RuntimeState* state) {
     ss << "ScanThread complete (node=" << id() << "):";
     _progress = ProgressUpdater(ss.str(), _olap_scanners.size(), 1);
 
-    _transfer_thread.add_thread(new boost::thread(&OlapScanNode::transfer_thread, this, state));
+    _transfer_thread = std::make_shared<std::thread>(&OlapScanNode::transfer_thread, this, state);
 
     return Status::OK();
 }
@@ -1443,7 +1432,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
             }
         }
 
-        RowBatchInterface* scan_batch = NULL;
+        RowBatch* scan_batch = NULL;
         {
             // 1 scanner idle task not empty, assign new scanner task
             std::unique_lock<std::mutex> l(_scan_batches_lock);
@@ -1658,7 +1647,7 @@ void OlapScanNode::scanner_thread(OlapScanner* scanner) {
     _scan_thread_exit_cv.notify_one();
 }
 
-Status OlapScanNode::add_one_batch(RowBatchInterface* row_batch) {
+Status OlapScanNode::add_one_batch(RowBatch* row_batch) {
     {
         std::unique_lock<std::mutex> l(_row_batches_lock);
 
diff --git a/be/src/exec/olap_scan_node.h b/be/src/exec/olap_scan_node.h
index c7142a5..983f051 100644
--- a/be/src/exec/olap_scan_node.h
+++ b/be/src/exec/olap_scan_node.h
@@ -19,8 +19,6 @@
 #define DORIS_BE_SRC_QUERY_EXEC_OLAP_SCAN_NODE_H
 
 #include <atomic>
-#include <boost/thread.hpp>
-#include <boost/variant/static_visitor.hpp>
 #include <condition_variable>
 #include <queue>
 
@@ -66,45 +64,6 @@ protected:
         Tuple* tuple;
         int id;
     } HeapType;
-    class IsFixedValueRangeVisitor : public boost::static_visitor<bool> {
-    public:
-        template <class T>
-        bool operator()(T& v) const {
-            return v.is_fixed_value_range();
-        }
-    };
-
-    class GetFixedValueSizeVisitor : public boost::static_visitor<size_t> {
-    public:
-        template <class T>
-        size_t operator()(T& v) const {
-            return v.get_fixed_value_size();
-        }
-    };
-
-    class ExtendScanKeyVisitor : public boost::static_visitor<Status> {
-    public:
-        ExtendScanKeyVisitor(OlapScanKeys& scan_keys, int32_t max_scan_key_num)
-                : _scan_keys(scan_keys), _max_scan_key_num(max_scan_key_num) {}
-        template <class T>
-        Status operator()(T& v) {
-            return _scan_keys.extend_scan_key(v, _max_scan_key_num);
-        }
-
-    private:
-        OlapScanKeys& _scan_keys;
-        int32_t _max_scan_key_num;
-    };
-
-    typedef boost::variant<std::list<std::string>> string_list;
-
-    class ToOlapFilterVisitor : public boost::static_visitor<void> {
-    public:
-        template <class T, class P>
-        void operator()(T& v, P& v2) const {
-            v.to_olap_filter(v2);
-        }
-    };
 
     class MergeComparison {
     public:
@@ -172,7 +131,7 @@ protected:
     void transfer_thread(RuntimeState* state);
     void scanner_thread(OlapScanner* scanner);
 
-    Status add_one_batch(RowBatchInterface* row_batch);
+    Status add_one_batch(RowBatch* row_batch);
 
     // Write debug string of this into out.
     virtual void debug_string(int indentation_level, std::stringstream* out) const;
@@ -236,9 +195,9 @@ protected:
     // Pool for storing allocated scanner objects.  We don't want to use the
     // runtime pool to ensure that the scanner objects are deleted before this
     // object is.
-    std::unique_ptr<ObjectPool> _scanner_pool;
+    ObjectPool _scanner_pool;
 
-    boost::thread_group _transfer_thread;
+    std::shared_ptr<std::thread> _transfer_thread;
 
     // Keeps track of total splits and the number finished.
     ProgressUpdater _progress;
@@ -253,14 +212,14 @@ protected:
     std::condition_variable _row_batch_added_cv;
     std::condition_variable _row_batch_consumed_cv;
 
-    std::list<RowBatchInterface*> _materialized_row_batches;
+    std::list<RowBatch*> _materialized_row_batches;
 
     std::mutex _scan_batches_lock;
     std::condition_variable _scan_batch_added_cv;
     int64_t _running_thread = 0;
     std::condition_variable _scan_thread_exit_cv;
 
-    std::list<RowBatchInterface*> _scan_row_batches;
+    std::list<RowBatch*> _scan_row_batches;
 
     std::list<OlapScanner*> _olap_scanners;
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org