You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:07 UTC
[incubator-doris] 12/22: [Bugfix(Vec)] Fix some memory leak issues (#9824)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 60ed43e621dee87c6e0514a2ce9ad022bd6cb192
Author: Amos Bird <am...@gmail.com>
AuthorDate: Sun May 29 23:04:11 2022 +0800
[Bugfix(Vec)] Fix some memory leak issues (#9824)
---
be/src/vec/exec/join/vhash_join_node.cpp | 2 ++
be/src/vec/exec/vaggregation_node.cpp | 5 +++--
be/src/vec/exec/vanalytic_eval_node.cpp | 12 ++++++++++--
be/src/vec/exec/vexchange_node.cpp | 3 +++
be/src/vec/exec/vsort_node.cpp | 3 +--
be/src/vec/exprs/vexpr_context.cpp | 9 ++++++++-
be/src/vec/exprs/vexpr_context.h | 1 +
7 files changed, 28 insertions(+), 7 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index a350f99ef4..d91ff2d22b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -808,6 +808,8 @@ Status HashJoinNode::close(RuntimeState* state) {
return Status::OK();
}
+ VExpr::close(_build_expr_ctxs, state);
+ VExpr::close(_probe_expr_ctxs, state);
if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
_mem_tracker->Release(_mem_used);
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 76b4b349ef..dcd7b375d7 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -402,10 +402,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
Status AggregationNode::close(RuntimeState* state) {
if (is_closed()) return Status::OK();
- RETURN_IF_ERROR(ExecNode::close(state));
+ for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state);
VExpr::close(_probe_expr_ctxs, state);
if (_executor.close) _executor.close();
- return Status::OK();
+
+ return ExecNode::close(state);
}
Status AggregationNode::_create_agg_status(AggregateDataPtr data) {
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 4d69716216..65765d15d3 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -216,6 +216,9 @@ Status VAnalyticEvalNode::open(RuntimeState* state) {
for (size_t i = 0; i < _agg_functions_size; ++i) {
RETURN_IF_ERROR(VExpr::open(_agg_expr_ctxs[i], state));
}
+ for (auto* agg_function : _agg_functions) {
+ RETURN_IF_ERROR(agg_function->open(state));
+ }
return Status::OK();
}
@@ -223,9 +226,14 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
}
- ExecNode::close(state);
+
+ VExpr::close(_partition_by_eq_expr_ctxs, state);
+ VExpr::close(_order_by_eq_expr_ctxs, state);
+ for (size_t i = 0; i < _agg_functions_size; ++i) VExpr::close(_agg_expr_ctxs[i], state);
+ for (auto* agg_function : _agg_functions) agg_function->close(state);
+
_destory_agg_status();
- return Status::OK();
+ return ExecNode::close(state);
}
Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index 91b107904d..83c9e31b56 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -98,6 +98,9 @@ Status VExchangeNode::close(RuntimeState* state) {
if (_stream_recvr != nullptr) {
_stream_recvr->close();
}
+
+ if (_is_merging) _vsort_exec_exprs.close(state);
+
return ExecNode::close(state);
}
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 734af91baa..99c8090fde 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -104,8 +104,7 @@ Status VSortNode::close(RuntimeState* state) {
}
_mem_tracker->Release(_total_mem_usage);
_vsort_exec_exprs.close(state);
- ExecNode::close(state);
- return Status::OK();
+ return ExecNode::close(state);
}
void VSortNode::debug_string(int indentation_level, stringstream* out) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index b8c7f37b4a..bb8f615dc5 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -29,6 +29,14 @@ VExprContext::VExprContext(VExpr* expr)
_closed(false),
_last_result_column_id(-1) {}
+VExprContext::~VExprContext() {
+ DCHECK(!_prepared || _closed);
+
+ for (int i = 0; i < _fn_contexts.size(); ++i) {
+ delete _fn_contexts[i];
+ }
+}
+
doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) {
Status st = _root->execute(this, block, result_column_id);
_last_result_column_id = *result_column_id;
@@ -64,7 +72,6 @@ void VExprContext::close(doris::RuntimeState* state) {
for (int i = 0; i < _fn_contexts.size(); ++i) {
_fn_contexts[i]->impl()->close();
- delete _fn_contexts[i];
}
// _pool can be NULL if Prepare() was never called
if (_pool != NULL) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 9d2d467dd4..54b47d7d1b 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -27,6 +27,7 @@ class VExpr;
class VExprContext {
public:
VExprContext(VExpr* expr);
+ ~VExprContext();
Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
const std::shared_ptr<MemTracker>& tracker);
Status open(RuntimeState* state);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org