You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/29 15:04:18 UTC

[incubator-doris] branch master updated: [Bugfix(Vec)] Fix some memory leak issues (#9824)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 63aab5ee5d [Bugfix(Vec)] Fix some memory leak issues (#9824)
63aab5ee5d is described below

commit 63aab5ee5d8555b92b1ba504f33e37994e175784
Author: Amos Bird <am...@gmail.com>
AuthorDate: Sun May 29 23:04:11 2022 +0800

    [Bugfix(Vec)] Fix some memory leak issues (#9824)
---
 be/src/vec/exec/join/vhash_join_node.cpp |  2 ++
 be/src/vec/exec/vaggregation_node.cpp    |  5 +++--
 be/src/vec/exec/vanalytic_eval_node.cpp  | 12 ++++++++++--
 be/src/vec/exec/vexchange_node.cpp       |  3 +++
 be/src/vec/exec/vsort_node.cpp           |  3 +--
 be/src/vec/exprs/vexpr_context.cpp       |  9 ++++++++-
 be/src/vec/exprs/vexpr_context.h         |  1 +
 7 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index e3d83cfe11..e9d61a982d 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -820,6 +820,8 @@ Status HashJoinNode::close(RuntimeState* state) {
         return Status::OK();
     }
 
+    VExpr::close(_build_expr_ctxs, state);
+    VExpr::close(_probe_expr_ctxs, state);
     if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state);
 
     _hash_table_mem_tracker->release(_mem_used);
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index e7ff52f1d6..b8f92bd4bf 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -406,10 +406,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
 Status AggregationNode::close(RuntimeState* state) {
     if (is_closed()) return Status::OK();
 
-    RETURN_IF_ERROR(ExecNode::close(state));
+    for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state);
     VExpr::close(_probe_expr_ctxs, state);
     if (_executor.close) _executor.close();
-    return Status::OK();
+
+    return ExecNode::close(state);
 }
 
 Status AggregationNode::_create_agg_status(AggregateDataPtr data) {
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index b4fe98ec86..a88c7a746a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -223,6 +223,9 @@ Status VAnalyticEvalNode::open(RuntimeState* state) {
     for (size_t i = 0; i < _agg_functions_size; ++i) {
         RETURN_IF_ERROR(VExpr::open(_agg_expr_ctxs[i], state));
     }
+    for (auto* agg_function : _agg_functions) {
+        RETURN_IF_ERROR(agg_function->open(state));
+    }
     return Status::OK();
 }
 
@@ -230,9 +233,14 @@ Status VAnalyticEvalNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
     }
-    ExecNode::close(state);
+
+    VExpr::close(_partition_by_eq_expr_ctxs, state);
+    VExpr::close(_order_by_eq_expr_ctxs, state);
+    for (size_t i = 0; i < _agg_functions_size; ++i) VExpr::close(_agg_expr_ctxs[i], state);
+    for (auto* agg_function : _agg_functions) agg_function->close(state);
+
     _destory_agg_status();
-    return Status::OK();
+    return ExecNode::close(state);
 }
 
 Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index 57cbcff921..f790aa7436 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -103,6 +103,9 @@ Status VExchangeNode::close(RuntimeState* state) {
     if (_stream_recvr != nullptr) {
         _stream_recvr->close();
     }
+
+    if (_is_merging) _vsort_exec_exprs.close(state);
+
     return ExecNode::close(state);
 }
 
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4d4284bc26..0a3aa18b2f 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -107,8 +107,7 @@ Status VSortNode::close(RuntimeState* state) {
     }
     _block_mem_tracker->release(_total_mem_usage);
     _vsort_exec_exprs.close(state);
-    ExecNode::close(state);
-    return Status::OK();
+    return ExecNode::close(state);
 }
 
 void VSortNode::debug_string(int indentation_level, stringstream* out) const {
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 6615e0ff8c..ea3300eaf3 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -30,6 +30,14 @@ VExprContext::VExprContext(VExpr* expr)
           _closed(false),
           _last_result_column_id(-1) {}
 
+VExprContext::~VExprContext() {
+    DCHECK(!_prepared || _closed);
+
+    for (int i = 0; i < _fn_contexts.size(); ++i) {
+        delete _fn_contexts[i];
+    }
+}
+
 doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) {
     Status st = _root->execute(this, block, result_column_id);
     _last_result_column_id = *result_column_id;
@@ -68,7 +76,6 @@ void VExprContext::close(doris::RuntimeState* state) {
 
     for (int i = 0; i < _fn_contexts.size(); ++i) {
         _fn_contexts[i]->impl()->close();
-        delete _fn_contexts[i];
     }
     // _pool can be NULL if Prepare() was never called
     if (_pool != NULL) {
diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h
index 80da73531b..86a9d086a6 100644
--- a/be/src/vec/exprs/vexpr_context.h
+++ b/be/src/vec/exprs/vexpr_context.h
@@ -27,6 +27,7 @@ class VExpr;
 class VExprContext {
 public:
     VExprContext(VExpr* expr);
+    ~VExprContext();
     Status prepare(RuntimeState* state, const RowDescriptor& row_desc,
                    const std::shared_ptr<MemTracker>& tracker);
     Status open(RuntimeState* state);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org