You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/26 15:52:09 UTC
[incubator-doris] 06/15: [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 3dd722bc4ca2850a34e9df691ff666e8ec500688
Author: SleepyBear <ka...@live.cn>
AuthorDate: Mon Apr 25 10:01:42 2022 +0800
[fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
---
be/src/exec/base_scanner.cpp | 13 ++++++++++++-
be/src/exec/base_scanner.h | 6 +++++-
be/src/exec/broker_scanner.cpp | 12 +++---------
be/src/exec/broker_scanner.h | 2 +-
be/src/exec/json_scanner.cpp | 3 +--
be/src/exec/orc_scanner.cpp | 3 +--
be/src/exec/parquet_scanner.cpp | 3 +--
7 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index ab004b17f0..cd9bf8685e 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() {
return Status::OK();
}
-Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
+Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) {
+ RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool));
+ if (_success) {
+ free_expr_local_allocations();
+ *fill_tuple = true;
+ } else {
+ *fill_tuple = false;
+ }
+ return Status::OK();
+}
+
+Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
// filter src tuple by preceding filter first
if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
_counter->num_rows_unselected++;
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index a745e938d2..21abf080f9 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -58,12 +58,13 @@ public:
// Close this scanner
virtual void close() = 0;
- Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
+ Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);
void free_expr_local_allocations();
+
protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
@@ -106,6 +107,9 @@ protected:
// Used to record whether a row of data is successfully read.
bool _success = false;
bool _scanner_eof = false;
+
+private:
+ Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};
} /* namespace doris */
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index 96e9e315c1..00c31d3c1f 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
{
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool));
- if (_success) {
- free_expr_local_allocations();
- *fill_tuple = true;
- } else {
- *fill_tuple = false;
- }
+ RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple));
break; // break always
}
}
@@ -461,14 +455,14 @@ bool is_null(const Slice& slice) {
}
// Convert one row to this tuple
-Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) {
+Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) {
RETURN_IF_ERROR(_line_to_src_tuple(line));
if (!_success) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}
- return fill_dest_tuple(tuple, tuple_pool);
+ return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}
// Convert one row to this tuple
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index d6deb7d64f..ddd8bd9c78 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -87,7 +87,7 @@ private:
// Convert one row to one tuple
// 'ptr' and 'len' is csv text line
// output is tuple
- Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
+ Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
Status _line_to_src_tuple(const Slice& line);
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 67a58c4216..d651545806 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -96,8 +96,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 0c69945271..8537189348 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -357,8 +357,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break;
}
if (_scanner_eof) {
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 9a85a1253a..3295dc4bc7 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org