You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/25 02:01:47 UTC
[incubator-doris] branch master updated: [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c3d0fee01b [fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
c3d0fee01b is described below
commit c3d0fee01b98ca153cf88d42be297b35ca595764
Author: SleepyBear <ka...@live.cn>
AuthorDate: Mon Apr 25 10:01:42 2022 +0800
[fix](broker load) sync the workflow of BrokerScanner to other Scanner to avoid oom (#9173)
---
be/src/exec/base_scanner.cpp | 13 ++++++++++++-
be/src/exec/base_scanner.h | 6 +++++-
be/src/exec/broker_scanner.cpp | 12 +++---------
be/src/exec/broker_scanner.h | 2 +-
be/src/exec/json_scanner.cpp | 3 +--
be/src/exec/orc_scanner.cpp | 3 +--
be/src/exec/parquet_scanner.cpp | 3 +--
7 files changed, 24 insertions(+), 18 deletions(-)
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 06331023e8..796defc6d0 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() {
return Status::OK();
}
-Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
+Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) {
+ RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool));
+ if (_success) {
+ free_expr_local_allocations();
+ *fill_tuple = true;
+ } else {
+ *fill_tuple = false;
+ }
+ return Status::OK();
+}
+
+Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
// filter src tuple by preceding filter first
if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
_counter->num_rows_unselected++;
diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h
index bce0f4b8ca..c01891c381 100644
--- a/be/src/exec/base_scanner.h
+++ b/be/src/exec/base_scanner.h
@@ -58,12 +58,13 @@ public:
// Close this scanner
virtual void close() = 0;
- Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
+ Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);
void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);
void free_expr_local_allocations();
+
protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
@@ -106,6 +107,9 @@ protected:
// Used to record whether a row of data is successfully read.
bool _success = false;
bool _scanner_eof = false;
+
+private:
+ Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};
} /* namespace doris */
diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp
index c4f771faaa..372cb1f0d4 100644
--- a/be/src/exec/broker_scanner.cpp
+++ b/be/src/exec/broker_scanner.cpp
@@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
{
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool));
- if (_success) {
- free_expr_local_allocations();
- *fill_tuple = true;
- } else {
- *fill_tuple = false;
- }
+ RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple));
break; // break always
}
}
@@ -469,14 +463,14 @@ bool is_null(const Slice& slice) {
}
// Convert one row to this tuple
-Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) {
+Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) {
RETURN_IF_ERROR(_line_to_src_tuple(line));
if (!_success) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}
- return fill_dest_tuple(tuple, tuple_pool);
+ return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}
// Convert one row to this tuple
diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h
index 059336da90..23d1a81c02 100644
--- a/be/src/exec/broker_scanner.h
+++ b/be/src/exec/broker_scanner.h
@@ -86,7 +86,7 @@ private:
// Convert one row to one tuple
// 'ptr' and 'len' is csv text line
// output is tuple
- Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
+ Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);
Status _line_to_src_tuple(const Slice& line);
diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp
index 1e08eec599..945afde21d 100644
--- a/be/src/exec/json_scanner.cpp
+++ b/be/src/exec/json_scanner.cpp
@@ -95,8 +95,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp
index 254c7bb92a..fbefe1cb40 100644
--- a/be/src/exec/orc_scanner.cpp
+++ b/be/src/exec/orc_scanner.cpp
@@ -356,8 +356,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break;
}
if (_scanner_eof) {
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 9a85a1253a..3295dc4bc7 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
- RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
- *fill_tuple = _success;
+ RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org