You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/26 15:52:09 UTC

[incubator-doris] 06/15: [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 3dd722bc4ca2850a34e9df691ff666e8ec500688
Author: SleepyBear <ka...@live.cn>
AuthorDate: Mon Apr 25 10:01:42 2022 +0800

    [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
---
 be/src/exec/base_scanner.cpp    | 13 ++++++++++++-
 be/src/exec/base_scanner.h      |  6 +++++-
 be/src/exec/broker_scanner.cpp  | 12 +++---------
 be/src/exec/broker_scanner.h    |  2 +-
 be/src/exec/json_scanner.cpp    |  3 +--
 be/src/exec/orc_scanner.cpp     |  3 +--
 be/src/exec/parquet_scanner.cpp |  3 +--
 7 files changed, 24 insertions(+), 18 deletions(-)

diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index ab004b17f0..cd9bf8685e 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() {
     return Status::OK();
 }
 
-Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
+Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) {
+    RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool));
+    if (_success) {
+        free_expr_local_allocations();
+        *fill_tuple = true;
+    } else {
+        *fill_tuple = false;
+    }
+    return Status::OK();
+}
+
+Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
     // filter src tuple by preceding filter first
     if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
         _counter->num_rows_unselected++;
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index a745e938d2..21abf080f9 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -58,12 +58,13 @@ public:
 
     // Close this scanner
     virtual void close() = 0;
-    Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
+    Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
 
     void fill_slots_of_columns_from_path(int start,
                                          const std::vector<std::string>& columns_from_path);
 
     void free_expr_local_allocations();
+    
 protected:
     RuntimeState* _state;
     const TBrokerScanRangeParams& _params;
@@ -106,6 +107,9 @@ protected:
     // Used to record whether a row of data is successfully read.
     bool _success = false;
     bool _scanner_eof = false;
+
+private:
+    Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
 };
 
 } /* namespace doris */
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 96e9e315c1..00c31d3c1f 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
         {
             COUNTER_UPDATE(_rows_read_counter, 1);
             SCOPED_TIMER(_materialize_timer);
-            RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool));
-            if (_success) {
-                free_expr_local_allocations();
-                *fill_tuple = true;
-            } else {
-                *fill_tuple = false;
-            }
+            RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple));
             break; // break always
         }
     }
@@ -461,14 +455,14 @@ bool is_null(const Slice& slice) {
 }
 
 // Convert one row to this tuple
-Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) {
+Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) {
     RETURN_IF_ERROR(_line_to_src_tuple(line));
     if (!_success) {
         // If not success, which means we met an invalid row, return.
         return Status::OK();
     }
 
-    return fill_dest_tuple(tuple, tuple_pool);
+    return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
 }
 
 // Convert one row to this tuple
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index d6deb7d64f..ddd8bd9c78 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -87,7 +87,7 @@ private:
     // Convert one row to one tuple
     //  'ptr' and 'len' is csv text line
     //  output is tuple
-    Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
+    Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
 
     Status _line_to_src_tuple(const Slice& line);
 
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 67a58c4216..d651545806 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -96,8 +96,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool
         }
         COUNTER_UPDATE(_rows_read_counter, 1);
         SCOPED_TIMER(_materialize_timer);
-        RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
-        *fill_tuple = _success;
+        RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
         break; // break always
     }
     if (_scanner_eof) {
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 0c69945271..8537189348 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -357,8 +357,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
             }
             COUNTER_UPDATE(_rows_read_counter, 1);
             SCOPED_TIMER(_materialize_timer);
-            RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
-            *fill_tuple = _success;
+            RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
             break;
         }
         if (_scanner_eof) {
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 9a85a1253a..3295dc4bc7 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
 
         COUNTER_UPDATE(_rows_read_counter, 1);
         SCOPED_TIMER(_materialize_timer);
-        RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
-        *fill_tuple = _success;
+        RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
         break; // break always
     }
     if (_scanner_eof) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org