You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/11/30 14:04:21 UTC

[incubator-doris] branch master updated: [fix](broker-load) BE may crash when using preceding filter in broker or routine load (#7193)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c4aeab  [fix](broker-load) BE may crash when using preceding filter in broker or routine load (#7193)
6c4aeab is described below

commit 6c4aeab06f3c84f387c523758a8145c7a6ecf970
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Nov 30 22:04:05 2021 +0800

    [fix](broker-load) BE may crash when using preceding filter in broker or routine load (#7193)
    
    The broker scan node has two tuple descriptors:
    One is dest tuple and the other is src tuple.
    The src tuple is used to read the lines of the original file,
    
    and the dest tuple is used to save the converted lines.
    The preceding filter is executed on the src tuple, so src tuple descriptor should be used
    to initialize the filter expression
---
 be/src/exec/base_scanner.cpp                | 17 +++++++++++++--
 be/src/exec/base_scanner.h                  |  9 +++++---
 be/src/exec/broker_scan_node.cpp            | 34 ++++++-----------------------
 be/src/exec/broker_scan_node.h              |  9 +++++---
 be/src/exec/broker_scanner.cpp              |  5 +++--
 be/src/exec/broker_scanner.h                |  2 +-
 be/src/exec/json_scanner.cpp                |  5 +++--
 be/src/exec/json_scanner.h                  |  2 +-
 be/src/exec/orc_scanner.cpp                 |  5 +++--
 be/src/exec/orc_scanner.h                   |  2 +-
 be/src/exec/parquet_scanner.cpp             |  5 +++--
 be/src/exec/parquet_scanner.h               |  2 +-
 be/src/olap/push_handler.cpp                |  2 +-
 be/src/olap/push_handler.h                  |  2 +-
 be/test/exec/broker_scanner_test.cpp        |  2 +-
 be/test/exec/multi_bytes_separator_test.cpp |  4 ++--
 be/test/exec/orc_scanner_test.cpp           |  2 +-
 17 files changed, 56 insertions(+), 53 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 2b1c075..14bad74 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -31,7 +31,7 @@ namespace doris {
 
 BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
                          const TBrokerScanRangeParams& params,
-                         const std::vector<ExprContext*>& pre_filter_ctxs,
+                         const std::vector<TExpr>& pre_filter_texprs,
                          ScannerCounter* counter)
         : _state(state),
           _params(params),
@@ -47,7 +47,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
 #endif
           _mem_pool(_mem_tracker.get()),
           _dest_tuple_desc(nullptr),
-          _pre_filter_ctxs(pre_filter_ctxs),
+          _pre_filter_texprs(pre_filter_texprs),
           _strict_mode(false),
           _line_counter(0),
           _profile(profile),
@@ -101,6 +101,13 @@ Status BaseScanner::init_expr_ctxes() {
                                       std::vector<TupleId>({_params.src_tuple_id}),
                                       std::vector<bool>({false})));
 
+    // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor
+    if (!_pre_filter_texprs.empty()) {
+        RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
+        RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
+        RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
+    }
+
     // Construct dest slots information
     _dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id);
     if (_dest_tuple_desc == nullptr) {
@@ -234,5 +241,11 @@ void BaseScanner::free_expr_local_allocations() {
     }
 }
 
+void BaseScanner::close() {
+    if (!_pre_filter_ctxs.empty()) {
+        Expr::close(_pre_filter_ctxs, _state);
+    }
+}
+
 
 } // namespace doris
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index 957dffc..8990532 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -46,7 +46,7 @@ struct ScannerCounter {
 class BaseScanner {
 public:
     BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
-                const std::vector<ExprContext*>& pre_filter_ctxs, ScannerCounter* counter);
+                const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
     virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); };
 
     virtual Status init_expr_ctxes();
@@ -57,7 +57,7 @@ public:
     virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof) = 0;
 
     // Close this scanner
-    virtual void close() = 0;
+    virtual void close();
     bool fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
 
     void fill_slots_of_columns_from_path(int start,
@@ -89,7 +89,10 @@ protected:
     std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
 
     // to filter src tuple directly
-	const std::vector<ExprContext*>& _pre_filter_ctxs;
+    // the `_pre_filter_texprs` is the origin thrift exprs passed from scan node,
+    // and will be converted to `_pre_filter_ctxs` when scanner is open.
+    const std::vector<TExpr> _pre_filter_texprs;
+    std::vector<ExprContext*> _pre_filter_ctxs;
 
     bool _strict_mode;
 
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 7c696b6..c46f7ab 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -51,8 +51,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
     auto& broker_scan_node = tnode.broker_scan_node;
 
     if (broker_scan_node.__isset.pre_filter_exprs) {
-        RETURN_IF_ERROR(Expr::create_expr_trees(_pool, broker_scan_node.pre_filter_exprs,
-                                                &_pre_filter_ctxs));
+        _pre_filter_texprs = broker_scan_node.pre_filter_exprs;
     }
 
     return Status::OK();
@@ -80,10 +79,6 @@ Status BrokerScanNode::prepare(RuntimeState* state) {
         }
     }
 
-    if (_pre_filter_ctxs.size() > 0) {
-        RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, state, row_desc(), expr_mem_tracker()));
-    }
-
     // Profile
     _wait_scanner_timer = ADD_TIMER(runtime_profile(), "WaitScannerTime");
 
@@ -96,10 +91,6 @@ Status BrokerScanNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
 
-    if (_pre_filter_ctxs.size() > 0) {
-        RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, state));
-    }
-
     RETURN_IF_ERROR(start_scanners());
 
     return Status::OK();
@@ -208,10 +199,6 @@ Status BrokerScanNode::close(RuntimeState* state) {
         _scanner_threads[i].join();
     }
 
-    if (_pre_filter_ctxs.size() > 0) {
-        Expr::close(_pre_filter_ctxs, state);
-    }
-
     // Close
     _batch_queue.clear();
 
@@ -229,40 +216,38 @@ void BrokerScanNode::debug_string(int ident_level, std::stringstream* out) const
 }
 
 std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRange& scan_range,
-                                                            const std::vector<ExprContext*>& pre_filter_ctxs,
                                                             ScannerCounter* counter) {
     BaseScanner* scan = nullptr;
     switch (scan_range.ranges[0].format_type) {
     case TFileFormatType::FORMAT_PARQUET:
         scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
                                   scan_range.ranges, scan_range.broker_addresses,
-                                  pre_filter_ctxs, counter);
+                                  _pre_filter_texprs, counter);
         break;
     case TFileFormatType::FORMAT_ORC:
         scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
                               scan_range.ranges, scan_range.broker_addresses,
-                              pre_filter_ctxs, counter);
+                              _pre_filter_texprs, counter);
         break;
     case TFileFormatType::FORMAT_JSON:
         scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params,
                                scan_range.ranges, scan_range.broker_addresses,
-                               pre_filter_ctxs, counter);
+                               _pre_filter_texprs, counter);
         break;
     default:
         scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params,
                                  scan_range.ranges, scan_range.broker_addresses,
-                                 pre_filter_ctxs, counter);
+                                 _pre_filter_texprs, counter);
     }
     std::unique_ptr<BaseScanner> scanner(scan);
     return scanner;
 }
 
 Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
-                                    const std::vector<ExprContext*>& pre_filter_ctxs,
                                     const std::vector<ExprContext*>& conjunct_ctxs,
                                     ScannerCounter* counter) {
     //create scanner object and open
-    std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, pre_filter_ctxs, counter);
+    std::unique_ptr<BaseScanner> scanner = create_scanner(scan_range, counter);
     RETURN_IF_ERROR(scanner->open());
     bool scanner_eof = false;
 
@@ -359,16 +344,11 @@ void BrokerScanNode::scanner_worker(int start_idx, int length) {
         LOG(WARNING) << "Clone conjuncts failed.";
     }
 
-    std::vector<ExprContext*> pre_filter_ctxs;
-    if (status.ok()) {
-        status = Expr::clone_if_not_exists(_pre_filter_ctxs, _runtime_state, &pre_filter_ctxs);
-    }
-
     ScannerCounter counter;
     for (int i = 0; i < length && status.ok(); ++i) {
         const TBrokerScanRange& scan_range =
                 _scan_ranges[start_idx + i].scan_range.broker_scan_range;
-        status = scanner_scan(scan_range, pre_filter_ctxs, scanner_expr_ctxs, &counter);
+        status = scanner_scan(scan_range, scanner_expr_ctxs, &counter);
         if (!status.ok()) {
             LOG(WARNING) << "Scanner[" << start_idx + i
                          << "] process failed. status=" << status.get_error_msg();
diff --git a/be/src/exec/broker_scan_node.h b/be/src/exec/broker_scan_node.h
index a05e7c5..3958140 100644
--- a/be/src/exec/broker_scan_node.h
+++ b/be/src/exec/broker_scan_node.h
@@ -84,11 +84,9 @@ private:
 
     // Scan one range
     Status scanner_scan(const TBrokerScanRange& scan_range,
-                        const std::vector<ExprContext*>& pre_filter_ctxs,
                         const std::vector<ExprContext*>& conjunct_ctxs, ScannerCounter* counter);
 
     std::unique_ptr<BaseScanner> create_scanner(const TBrokerScanRange& scan_range,
-                                                const std::vector<ExprContext*>& pre_filter_ctxs,
                                                 ScannerCounter* counter);
 
 private:
@@ -115,7 +113,12 @@ private:
 
     int _max_buffered_batches;
 
-    std::vector<ExprContext*> _pre_filter_ctxs;
+    // The origin preceding filter exprs.
+    // These exprs will be converted to expr context
+    // in XXXScanner.
+    // Because the row descriptor used for these exprs is `src_row_desc`,
+    // which is initialized in XXXScanner.
+    std::vector<TExpr> _pre_filter_texprs;
 
     RuntimeProfile::Counter* _wait_scanner_timer;
 };
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index b265eb0..16c903e 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -47,9 +47,9 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                              const TBrokerScanRangeParams& params,
                              const std::vector<TBrokerRangeDesc>& ranges,
                              const std::vector<TNetworkAddress>& broker_addresses,
-                             const std::vector<ExprContext*>& pre_filter_ctxs,
+                             const std::vector<TExpr>& pre_filter_texprs,
                              ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
+        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
           _ranges(ranges),
           _broker_addresses(broker_addresses),
           _cur_file_reader(nullptr),
@@ -302,6 +302,7 @@ Status BrokerScanner::open_line_reader() {
 }
 
 void BrokerScanner::close() {
+    BaseScanner::close();
     if (_cur_decompressor != nullptr) {
         delete _cur_decompressor;
         _cur_decompressor = nullptr;
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 1da3e11..b13a791 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -56,7 +56,7 @@ public:
     BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
                   const TBrokerScanRangeParams& params, const std::vector<TBrokerRangeDesc>& ranges,
                   const std::vector<TNetworkAddress>& broker_addresses,
-                  const std::vector<ExprContext*>& pre_filter_ctxs, ScannerCounter* counter);
+                  const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);
     ~BrokerScanner();
 
     // Open this scanner, will initialize information need to
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 5cba337..022a35a 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -38,8 +38,8 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
                          const TBrokerScanRangeParams& params,
                          const std::vector<TBrokerRangeDesc>& ranges,
                          const std::vector<TNetworkAddress>& broker_addresses,
-                         const std::vector<ExprContext*>& pre_filter_ctxs, ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
+                         const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
+        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
           _ranges(ranges),
           _broker_addresses(broker_addresses),
           _cur_file_reader(nullptr),
@@ -247,6 +247,7 @@ Status JsonScanner::open_json_reader() {
 }
 
 void JsonScanner::close() {
+    BaseScanner::close();
     if (_cur_json_reader != nullptr) {
         delete _cur_json_reader;
         _cur_json_reader = nullptr;
diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h
index fdfe1e0..eadb98c 100644
--- a/be/src/exec/json_scanner.h
+++ b/be/src/exec/json_scanner.h
@@ -57,7 +57,7 @@ public:
     JsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
                 const std::vector<TBrokerRangeDesc>& ranges,
                 const std::vector<TNetworkAddress>& broker_addresses,
-                const std::vector<ExprContext*>& pre_filter_ctxs,
+                const std::vector<TExpr>& pre_filter_texprs,
                 ScannerCounter* counter);
     ~JsonScanner();
 
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 8732b7a..dd34e60 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -120,9 +120,9 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile,
                        const TBrokerScanRangeParams& params,
                        const std::vector<TBrokerRangeDesc>& ranges,
                        const std::vector<TNetworkAddress>& broker_addresses,
-                       const std::vector<ExprContext*>& pre_filter_ctxs,
+                       const std::vector<TExpr>& pre_filter_texprs,
                        ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
+        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
           _ranges(ranges),
           _broker_addresses(broker_addresses),
           // _splittable(params.splittable),
@@ -457,6 +457,7 @@ Status ORCScanner::open_next_reader() {
 }
 
 void ORCScanner::close() {
+    BaseScanner::close();
     _batch = nullptr;
     _reader.reset(nullptr);
     _row_reader.reset(nullptr);
diff --git a/be/src/exec/orc_scanner.h b/be/src/exec/orc_scanner.h
index c06dc0b..039eab2 100644
--- a/be/src/exec/orc_scanner.h
+++ b/be/src/exec/orc_scanner.h
@@ -30,7 +30,7 @@ public:
     ORCScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
                const std::vector<TBrokerRangeDesc>& ranges,
                const std::vector<TNetworkAddress>& broker_addresses,
-               const std::vector<ExprContext*>& pre_filter_ctxs,
+               const std::vector<TExpr>& pre_filter_texprs,
                ScannerCounter* counter);
 
     ~ORCScanner() override;
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index f574c83..378921f 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -50,9 +50,9 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile,
                                const TBrokerScanRangeParams& params,
                                const std::vector<TBrokerRangeDesc>& ranges,
                                const std::vector<TNetworkAddress>& broker_addresses,
-                               const std::vector<ExprContext*>& pre_filter_ctxs,
+                               const std::vector<TExpr>& pre_filter_texprs,
                                ScannerCounter* counter)
-        : BaseScanner(state, profile, params, pre_filter_ctxs, counter),
+        : BaseScanner(state, profile, params, pre_filter_texprs, counter),
           _ranges(ranges),
           _broker_addresses(broker_addresses),
           // _splittable(params.splittable),
@@ -196,6 +196,7 @@ Status ParquetScanner::open_next_reader() {
 }
 
 void ParquetScanner::close() {
+    BaseScanner::close();
     if (_cur_file_reader != nullptr) {
         if (_stream_load_pipe != nullptr) {
             _stream_load_pipe.reset();
diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h
index a23b91f..72e58ce 100644
--- a/be/src/exec/parquet_scanner.h
+++ b/be/src/exec/parquet_scanner.h
@@ -52,7 +52,7 @@ public:
                    const TBrokerScanRangeParams& params,
                    const std::vector<TBrokerRangeDesc>& ranges,
                    const std::vector<TNetworkAddress>& broker_addresses,
-                   const std::vector<ExprContext*>& pre_filter_ctxs,
+                   const std::vector<TExpr>& pre_filter_texprs,
                    ScannerCounter* counter);
 
     ~ParquetScanner();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 747568c..e75324f 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -910,7 +910,7 @@ OLAPStatus PushBrokerReader::init(const Schema* schema, const TBrokerScanRange&
     case TFileFormatType::FORMAT_PARQUET:
         scanner = new ParquetScanner(_runtime_state.get(), _runtime_profile, t_scan_range.params,
                                      t_scan_range.ranges, t_scan_range.broker_addresses,
-                                     _pre_filter_ctxs, _counter.get());
+                                     _pre_filter_texprs, _counter.get());
         break;
     default:
         LOG(WARNING) << "Unsupported file format type: " << t_scan_range.ranges[0].format_type;
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index d782ee4..e67b50a 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -214,7 +214,7 @@ private:
     std::unique_ptr<ScannerCounter> _counter;
     std::unique_ptr<BaseScanner> _scanner;
     // Not used, just for placeholding
-    std::vector<ExprContext*> _pre_filter_ctxs;
+    std::vector<TExpr> _pre_filter_texprs;
 };
 
 } // namespace doris
diff --git a/be/test/exec/broker_scanner_test.cpp b/be/test/exec/broker_scanner_test.cpp
index ade80a1..9f0a4ee 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -68,7 +68,7 @@ private:
     DescriptorTbl* _desc_tbl;
     std::vector<TNetworkAddress> _addresses;
     ScannerCounter _counter;
-    std::vector<doris::ExprContext*> _pre_filter; 
+    std::vector<TExpr> _pre_filter; 
 };
 
 void BrokerScannerTest::init_desc_table() {
diff --git a/be/test/exec/multi_bytes_separator_test.cpp b/be/test/exec/multi_bytes_separator_test.cpp
index 082e58f..b6dc149 100644
--- a/be/test/exec/multi_bytes_separator_test.cpp
+++ b/be/test/exec/multi_bytes_separator_test.cpp
@@ -57,8 +57,8 @@ TEST_F(MultiBytesSeparatorTest, normal) {
 
     const std::vector<TBrokerRangeDesc> ranges;
     const std::vector<TNetworkAddress> broker_addresses;
-    const std::vector<ExprContext*> pre_filter_ctxs;
-    BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_ctxs, nullptr);
+    const std::vector<TExpr> pre_filter_texprs;
+    BrokerScanner scanner(nullptr, nullptr, params, ranges, broker_addresses, pre_filter_texprs, nullptr);
 
 #define private public
 
diff --git a/be/test/exec/orc_scanner_test.cpp b/be/test/exec/orc_scanner_test.cpp
index daf9bb8..19c7cf9 100644
--- a/be/test/exec/orc_scanner_test.cpp
+++ b/be/test/exec/orc_scanner_test.cpp
@@ -66,7 +66,7 @@ private:
     DescriptorTbl* _desc_tbl;
     std::vector<TNetworkAddress> _addresses;
     ScannerCounter _counter;
-    std::vector<doris::ExprContext*> _pre_filter;
+    std::vector<TExpr> _pre_filter;
 };
 
 TEST_F(OrcScannerTest, normal) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org