You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/27 07:43:01 UTC
[doris] 10/10: [fix](memory) Fix AggFunc memory leak due to incorrect destroy (#19126)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git
commit fd2f8a6baa8770b61de9c78e489b415e3f553f2d
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Apr 27 14:58:32 2023 +0800
[fix](memory) Fix AggFunc memory leak due to incorrect destroy (#19126)
---
be/src/olap/memtable.cpp | 18 ++++++++---
be/src/olap/schema_change.cpp | 37 ++++++++++++++--------
be/src/util/defer_op.h | 5 +++
.../vec/aggregate_functions/aggregate_function.h | 31 ++++++++++++++----
.../aggregate_function_distinct.h | 2 +-
.../aggregate_function_java_udaf.h | 7 +++-
.../aggregate_functions/aggregate_function_rpc.h | 3 +-
.../aggregate_functions/aggregate_function_sort.h | 2 +-
.../aggregate_function_window.h | 16 +++++-----
be/src/vec/exec/vaggregation_node.cpp | 9 +++++-
be/src/vec/exec/vaggregation_node.h | 20 +++++++-----
be/src/vec/exec/vanalytic_eval_node.cpp | 12 +++++--
be/src/vec/exec/vanalytic_eval_node.h | 1 +
be/src/vec/olap/block_reader.cpp | 8 +++--
be/src/vec/olap/vertical_block_reader.cpp | 8 +++--
15 files changed, 126 insertions(+), 53 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2b4bb79324..8a0686b21d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -205,11 +205,19 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
row_in_block->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16),
_offsets_of_aggregate_states.data());
for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) {
- auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
- auto data = row_in_block->agg_places(cid);
- _agg_functions[cid]->create(data);
- _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr),
- row_in_block->_row_pos, nullptr);
+ try {
+ auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
+ auto data = row_in_block->agg_places(cid);
+ _agg_functions[cid]->create(data);
+ _agg_functions[cid]->add(data,
+ const_cast<const doris::vectorized::IColumn**>(&col_ptr),
+ row_in_block->_row_pos, nullptr);
+ } catch (...) {
+ for (size_t i = _schema->num_key_columns(); i < cid; ++i) {
+ _agg_functions[i]->destroy(row_in_block->agg_places(i));
+ }
+ throw;
+ }
}
_vec_skip_list->InsertWithHint(row_in_block, is_exist, &_vec_hint);
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 86c7a86f8a..1fee5f744a 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -89,16 +89,32 @@ public:
std::vector<vectorized::AggregateDataPtr> agg_places;
for (int i = key_number; i < columns; i++) {
- vectorized::AggregateFunctionPtr function =
- tablet_schema->column(i).get_aggregate_function(
- {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
- agg_functions.push_back(function);
- // create aggregate data
- vectorized::AggregateDataPtr place = new char[function->size_of_data()];
- function->create(place);
- agg_places.push_back(place);
+ try {
+ vectorized::AggregateFunctionPtr function =
+ tablet_schema->column(i).get_aggregate_function(
+ {finalized_block.get_data_type(i)},
+ vectorized::AGG_LOAD_SUFFIX);
+ agg_functions.push_back(function);
+ // create aggregate data
+ vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+ function->create(place);
+ agg_places.push_back(place);
+ } catch (...) {
+ for (int j = 0; j < i - key_number; ++j) {
+ agg_functions[j]->destroy(agg_places[j]);
+ delete[] agg_places[j];
+ }
+ throw;
+ }
}
+ DEFER({
+ for (int i = 0; i < columns - key_number; i++) {
+ agg_functions[i]->destroy(agg_places[i]);
+ delete[] agg_places[i];
+ }
+ });
+
for (int i = 0; i < rows; i++) {
auto row_ref = row_refs[i];
@@ -130,11 +146,6 @@ public:
}
}
}
-
- for (int i = 0; i < columns - key_number; i++) {
- agg_functions[i]->destroy(agg_places[i]);
- delete[] agg_places[i];
- }
} else {
std::vector<RowRef> pushed_row_refs;
if (_tablet->keys_type() == KeysType::DUP_KEYS) {
diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h
index bb0732c227..cf86af5781 100644
--- a/be/src/util/defer_op.h
+++ b/be/src/util/defer_op.h
@@ -40,4 +40,9 @@ private:
T _closure;
};
+// Nested use Defer, variable name concat line number
+#define DEFER_CONCAT(n, ...) const auto defer##n = Defer([&]() { __VA_ARGS__; })
+#define DEFER_FWD(n, ...) DEFER_CONCAT(n, __VA_ARGS__)
+#define DEFER(...) DEFER_FWD(__LINE__, __VA_ARGS__)
+
} // namespace doris
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h
index ab0a7dde72..e02c2c16be 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -20,6 +20,7 @@
#pragma once
+#include "util/defer_op.h"
#include "vec/columns/column_complex.h"
#include "vec/common/hash_table/phmap_fwd_decl.h"
#include "vec/core/block.h"
@@ -46,6 +47,16 @@ using DataTypes = std::vector<DataTypePtr>;
using AggregateDataPtr = char*;
using ConstAggregateDataPtr = const char*;
+#define SAFE_CREATE(create, destroy) \
+ do { \
+ try { \
+ create; \
+ } catch (...) { \
+ destroy; \
+ throw; \
+ } \
+ } while (0)
+
/** Aggregate functions interface.
* Instances of classes with this interface do not contain the data itself for aggregation,
* but contain only metadata (description) of the aggregate function,
@@ -306,10 +317,10 @@ public:
char place[size_of_data()];
for (size_t i = 0; i != num_rows; ++i) {
static_cast<const Derived*>(this)->create(place);
+ DEFER({ static_cast<const Derived*>(this)->destroy(place); });
static_cast<const Derived*>(this)->add(place, columns, i, arena);
static_cast<const Derived*>(this)->serialize(place, buf);
buf.commit();
- static_cast<const Derived*>(this)->destroy(place);
}
}
@@ -330,10 +341,18 @@ public:
size_t num_rows) const override {
const auto size_of_data = static_cast<const Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
- auto place = places + size_of_data * i;
- VectorBufferReader buffer_reader(column->get_data_at(i));
- static_cast<const Derived*>(this)->create(place);
- static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena);
+ try {
+ auto place = places + size_of_data * i;
+ VectorBufferReader buffer_reader(column->get_data_at(i));
+ static_cast<const Derived*>(this)->create(place);
+ static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ auto place = places + size_of_data * j;
+ static_cast<const Derived*>(this)->destroy(place);
+ }
+ throw;
+ }
}
}
@@ -402,9 +421,9 @@ public:
auto derived = static_cast<const Derived*>(this);
derived->create(deserialized_place);
+ DEFER({ derived->destroy(deserialized_place); });
derived->deserialize(deserialized_place, buf, arena);
derived->merge(place, deserialized_place, arena);
- derived->destroy(deserialized_place);
}
void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
index 3961e32447..ef467a615b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
@@ -219,7 +219,7 @@ public:
void create(AggregateDataPtr __restrict place) const override {
new (place) Data;
- nested_func->create(get_nested_place(place));
+ SAFE_CREATE(nested_func->create(get_nested_place(place)), this->data(place).~Data());
}
void destroy(AggregateDataPtr __restrict place) const noexcept override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 1295e54fb8..09c2428ff0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -433,7 +433,12 @@ public:
if (_first_created) {
new (place) Data(argument_types.size());
Status status = Status::OK();
- RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location));
+ SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
+ this->data(place).init_udaf(_fn, _local_location)),
+ {
+ this->data(place).destroy();
+ this->data(place).~Data();
+ });
_first_created = false;
_exec_place = place;
}
diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
index 937c936d86..bff4e20a4f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
@@ -358,7 +358,8 @@ public:
void create(AggregateDataPtr __restrict place) const override {
new (place) Data(argument_types.size());
Status status = Status::OK();
- RETURN_IF_STATUS_ERROR(status, data(place).init(_fn));
+ SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, data(place).init(_fn)),
+ this->data(place).~Data());
}
String get_name() const override { return _fn.name.function_name; }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 4abdb9c980..1c78c8fb2e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -158,7 +158,7 @@ public:
void create(AggregateDataPtr __restrict place) const override {
new (place) Data(_sort_desc, _block);
- _nested_func->create(get_nested_place(place));
+ SAFE_CREATE(_nested_func->create(get_nested_place(place)), this->data(place).~Data());
}
void destroy(AggregateDataPtr __restrict place) const noexcept override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 83a4586ab2..6ab756cc50 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -34,7 +34,7 @@
namespace doris::vectorized {
struct RowNumberData {
- int64_t count;
+ int64_t count = 0;
};
class WindowFunctionRowNumber final
@@ -71,9 +71,9 @@ public:
};
struct RankData {
- int64_t rank;
- int64_t count;
- int64_t peer_group_start;
+ int64_t rank = 0;
+ int64_t count = 0;
+ int64_t peer_group_start = 0;
};
class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData, WindowFunctionRank> {
@@ -116,8 +116,8 @@ public:
};
struct DenseRankData {
- int64_t rank;
- int64_t peer_group_start;
+ int64_t rank = 0;
+ int64_t peer_group_start = 0;
};
class WindowFunctionDenseRank final
: public IAggregateFunctionDataHelper<DenseRankData, WindowFunctionDenseRank> {
@@ -157,8 +157,8 @@ public:
};
struct NTileData {
- int64_t bucket_index;
- int64_t rows;
+ int64_t bucket_index = 0;
+ int64_t rows = 0;
};
class WindowFunctionNTile final
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index c19a4b7730..91ddedd22a 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -603,7 +603,14 @@ Status AggregationNode::close(RuntimeState* state) {
Status AggregationNode::_create_agg_status(AggregateDataPtr data) {
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]);
+ try {
+ _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ _aggregate_evaluators[j]->destroy(data + _offsets_of_aggregate_states[j]);
+ }
+ throw;
+ }
}
return Status::OK();
}
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 34bcb7efed..90c2eaff5a 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -1217,13 +1217,15 @@ private:
_deserialize_buffer.data(), (ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
}
+
+ DEFER({
+ _aggregate_evaluators[i]->function()->destroy_vec(
+ _deserialize_buffer.data(), rows);
+ });
+
_aggregate_evaluators[i]->function()->merge_vec_selected(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), _agg_arena_pool.get(), rows);
-
- _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
- rows);
-
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i], _places.data(),
@@ -1257,13 +1259,15 @@ private:
_deserialize_buffer.data(), (ColumnString*)(column.get()),
_agg_arena_pool.get(), rows);
}
+
+ DEFER({
+ _aggregate_evaluators[i]->function()->destroy_vec(
+ _deserialize_buffer.data(), rows);
+ });
+
_aggregate_evaluators[i]->function()->merge_vec(
_places.data(), _offsets_of_aggregate_states[i],
_deserialize_buffer.data(), _agg_arena_pool.get(), rows);
-
- _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
- rows);
-
} else {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
block, _offsets_of_aggregate_states[i], _places.data(),
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 932baa370d..881928be42 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -712,13 +712,21 @@ Status VAnalyticEvalNode::_reset_agg_status() {
Status VAnalyticEvalNode::_create_agg_status() {
for (size_t i = 0; i < _agg_functions_size; ++i) {
- _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]);
+ try {
+ _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]);
+ } catch (...) {
+ for (int j = 0; j < i; ++j) {
+ _agg_functions[j]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[j]);
+ }
+ throw;
+ }
}
+ _agg_functions_created = true;
return Status::OK();
}
Status VAnalyticEvalNode::_destroy_agg_status() {
- if (UNLIKELY(_fn_place_ptr == nullptr)) {
+ if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
return Status::OK();
}
for (size_t i = 0; i < _agg_functions_size; ++i) {
diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h
index ca7d723974..62a64a448d 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -142,6 +142,7 @@ private:
int64_t _rows_start_offset = 0;
int64_t _rows_end_offset = 0;
size_t _agg_functions_size = 0;
+ bool _agg_functions_created = false;
/// The offset of the n-th functions.
std::vector<size_t> _offsets_of_aggregate_states;
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index a9dbcd6226..aed8eabe11 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -129,7 +129,10 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
_agg_functions.push_back(function);
// create aggregate data
AggregateDataPtr place = new char[function->size_of_data()];
- function->create(place);
+ SAFE_CREATE(function->create(place), {
+ _agg_functions.pop_back();
+ delete[] place;
+ });
_agg_places.push_back(place);
// calculate `has_string` tag.
@@ -438,8 +441,7 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
if (is_close) {
function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
// reset aggregate data
- function->destroy(place);
- function->create(place);
+ function->reset(place);
}
}
}
diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp
index 097ddb7513..e5ef2a403c 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -154,7 +154,10 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
_agg_functions.push_back(function);
// create aggregate data
AggregateDataPtr place = new char[function->size_of_data()];
- function->create(place);
+ SAFE_CREATE(function->create(place), {
+ _agg_functions.pop_back();
+ delete[] place;
+ });
_agg_places.push_back(place);
// calculate `has_string` tag.
@@ -271,8 +274,7 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
if (is_close) {
function->insert_result_into(place, *columns[idx]);
// reset aggregate data
- function->destroy(place);
- function->create(place);
+ function->reset(place);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org