You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/27 07:43:01 UTC

[doris] 10/10: [fix](memory) Fix AggFunc memory leak due to incorrect destroy (#19126)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git

commit fd2f8a6baa8770b61de9c78e489b415e3f553f2d
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Apr 27 14:58:32 2023 +0800

    [fix](memory) Fix AggFunc memory leak due to incorrect destroy (#19126)
---
 be/src/olap/memtable.cpp                           | 18 ++++++++---
 be/src/olap/schema_change.cpp                      | 37 ++++++++++++++--------
 be/src/util/defer_op.h                             |  5 +++
 .../vec/aggregate_functions/aggregate_function.h   | 31 ++++++++++++++----
 .../aggregate_function_distinct.h                  |  2 +-
 .../aggregate_function_java_udaf.h                 |  7 +++-
 .../aggregate_functions/aggregate_function_rpc.h   |  3 +-
 .../aggregate_functions/aggregate_function_sort.h  |  2 +-
 .../aggregate_function_window.h                    | 16 +++++-----
 be/src/vec/exec/vaggregation_node.cpp              |  9 +++++-
 be/src/vec/exec/vaggregation_node.h                | 20 +++++++-----
 be/src/vec/exec/vanalytic_eval_node.cpp            | 12 +++++--
 be/src/vec/exec/vanalytic_eval_node.h              |  1 +
 be/src/vec/olap/block_reader.cpp                   |  8 +++--
 be/src/vec/olap/vertical_block_reader.cpp          |  8 +++--
 15 files changed, 126 insertions(+), 53 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2b4bb79324..8a0686b21d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -205,11 +205,19 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
         row_in_block->init_agg_places(_arena->aligned_alloc(_total_size_of_aggregate_states, 16),
                                       _offsets_of_aggregate_states.data());
         for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++) {
-            auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
-            auto data = row_in_block->agg_places(cid);
-            _agg_functions[cid]->create(data);
-            _agg_functions[cid]->add(data, const_cast<const doris::vectorized::IColumn**>(&col_ptr),
-                                     row_in_block->_row_pos, nullptr);
+            try {
+                auto col_ptr = _input_mutable_block.mutable_columns()[cid].get();
+                auto data = row_in_block->agg_places(cid);
+                _agg_functions[cid]->create(data);
+                _agg_functions[cid]->add(data,
+                                         const_cast<const doris::vectorized::IColumn**>(&col_ptr),
+                                         row_in_block->_row_pos, nullptr);
+            } catch (...) {
+                for (size_t i = _schema->num_key_columns(); i < cid; ++i) {
+                    _agg_functions[i]->destroy(row_in_block->agg_places(i));
+                }
+                throw;
+            }
         }
 
         _vec_skip_list->InsertWithHint(row_in_block, is_exist, &_vec_hint);
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 86c7a86f8a..1fee5f744a 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -89,16 +89,32 @@ public:
             std::vector<vectorized::AggregateDataPtr> agg_places;
 
             for (int i = key_number; i < columns; i++) {
-                vectorized::AggregateFunctionPtr function =
-                        tablet_schema->column(i).get_aggregate_function(
-                                {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
-                agg_functions.push_back(function);
-                // create aggregate data
-                vectorized::AggregateDataPtr place = new char[function->size_of_data()];
-                function->create(place);
-                agg_places.push_back(place);
+                try {
+                    vectorized::AggregateFunctionPtr function =
+                            tablet_schema->column(i).get_aggregate_function(
+                                    {finalized_block.get_data_type(i)},
+                                    vectorized::AGG_LOAD_SUFFIX);
+                    agg_functions.push_back(function);
+                    // create aggregate data
+                    vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+                    function->create(place);
+                    agg_places.push_back(place);
+                } catch (...) {
+                    for (int j = 0; j < i - key_number; ++j) {
+                        agg_functions[j]->destroy(agg_places[j]);
+                        delete[] agg_places[j];
+                    }
+                    throw;
+                }
             }
 
+            DEFER({
+                for (int i = 0; i < columns - key_number; i++) {
+                    agg_functions[i]->destroy(agg_places[i]);
+                    delete[] agg_places[i];
+                }
+            });
+
             for (int i = 0; i < rows; i++) {
                 auto row_ref = row_refs[i];
 
@@ -130,11 +146,6 @@ public:
                     }
                 }
             }
-
-            for (int i = 0; i < columns - key_number; i++) {
-                agg_functions[i]->destroy(agg_places[i]);
-                delete[] agg_places[i];
-            }
         } else {
             std::vector<RowRef> pushed_row_refs;
             if (_tablet->keys_type() == KeysType::DUP_KEYS) {
diff --git a/be/src/util/defer_op.h b/be/src/util/defer_op.h
index bb0732c227..cf86af5781 100644
--- a/be/src/util/defer_op.h
+++ b/be/src/util/defer_op.h
@@ -40,4 +40,9 @@ private:
     T _closure;
 };
 
+// Nested use Defer, variable name concat line number
+#define DEFER_CONCAT(n, ...) const auto defer##n = Defer([&]() { __VA_ARGS__; })
+#define DEFER_FWD(n, ...) DEFER_CONCAT(n, __VA_ARGS__)
+#define DEFER(...) DEFER_FWD(__LINE__, __VA_ARGS__)
+
 } // namespace doris
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h b/be/src/vec/aggregate_functions/aggregate_function.h
index ab0a7dde72..e02c2c16be 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -20,6 +20,7 @@
 
 #pragma once
 
+#include "util/defer_op.h"
 #include "vec/columns/column_complex.h"
 #include "vec/common/hash_table/phmap_fwd_decl.h"
 #include "vec/core/block.h"
@@ -46,6 +47,16 @@ using DataTypes = std::vector<DataTypePtr>;
 using AggregateDataPtr = char*;
 using ConstAggregateDataPtr = const char*;
 
+#define SAFE_CREATE(create, destroy) \
+    do {                             \
+        try {                        \
+            create;                  \
+        } catch (...) {              \
+            destroy;                 \
+            throw;                   \
+        }                            \
+    } while (0)
+
 /** Aggregate functions interface.
   * Instances of classes with this interface do not contain the data itself for aggregation,
   *  but contain only metadata (description) of the aggregate function,
@@ -306,10 +317,10 @@ public:
         char place[size_of_data()];
         for (size_t i = 0; i != num_rows; ++i) {
             static_cast<const Derived*>(this)->create(place);
+            DEFER({ static_cast<const Derived*>(this)->destroy(place); });
             static_cast<const Derived*>(this)->add(place, columns, i, arena);
             static_cast<const Derived*>(this)->serialize(place, buf);
             buf.commit();
-            static_cast<const Derived*>(this)->destroy(place);
         }
     }
 
@@ -330,10 +341,18 @@ public:
                          size_t num_rows) const override {
         const auto size_of_data = static_cast<const Derived*>(this)->size_of_data();
         for (size_t i = 0; i != num_rows; ++i) {
-            auto place = places + size_of_data * i;
-            VectorBufferReader buffer_reader(column->get_data_at(i));
-            static_cast<const Derived*>(this)->create(place);
-            static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena);
+            try {
+                auto place = places + size_of_data * i;
+                VectorBufferReader buffer_reader(column->get_data_at(i));
+                static_cast<const Derived*>(this)->create(place);
+                static_cast<const Derived*>(this)->deserialize(place, buffer_reader, arena);
+            } catch (...) {
+                for (int j = 0; j < i; ++j) {
+                    auto place = places + size_of_data * j;
+                    static_cast<const Derived*>(this)->destroy(place);
+                }
+                throw;
+            }
         }
     }
 
@@ -402,9 +421,9 @@ public:
 
         auto derived = static_cast<const Derived*>(this);
         derived->create(deserialized_place);
+        DEFER({ derived->destroy(deserialized_place); });
         derived->deserialize(deserialized_place, buf, arena);
         derived->merge(place, deserialized_place, arena);
-        derived->destroy(deserialized_place);
     }
 
     void deserialize_and_merge_from_column(AggregateDataPtr __restrict place, const IColumn& column,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_distinct.h b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
index 3961e32447..ef467a615b 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_distinct.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_distinct.h
@@ -219,7 +219,7 @@ public:
 
     void create(AggregateDataPtr __restrict place) const override {
         new (place) Data;
-        nested_func->create(get_nested_place(place));
+        SAFE_CREATE(nested_func->create(get_nested_place(place)), this->data(place).~Data());
     }
 
     void destroy(AggregateDataPtr __restrict place) const noexcept override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
index 1295e54fb8..09c2428ff0 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h
@@ -433,7 +433,12 @@ public:
         if (_first_created) {
             new (place) Data(argument_types.size());
             Status status = Status::OK();
-            RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location));
+            SAFE_CREATE(RETURN_IF_STATUS_ERROR(status,
+                                               this->data(place).init_udaf(_fn, _local_location)),
+                        {
+                            this->data(place).destroy();
+                            this->data(place).~Data();
+                        });
             _first_created = false;
             _exec_place = place;
         }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_rpc.h b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
index 937c936d86..bff4e20a4f 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_rpc.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_rpc.h
@@ -358,7 +358,8 @@ public:
     void create(AggregateDataPtr __restrict place) const override {
         new (place) Data(argument_types.size());
         Status status = Status::OK();
-        RETURN_IF_STATUS_ERROR(status, data(place).init(_fn));
+        SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, data(place).init(_fn)),
+                    this->data(place).~Data());
     }
 
     String get_name() const override { return _fn.name.function_name; }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index 4abdb9c980..1c78c8fb2e 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -158,7 +158,7 @@ public:
 
     void create(AggregateDataPtr __restrict place) const override {
         new (place) Data(_sort_desc, _block);
-        _nested_func->create(get_nested_place(place));
+        SAFE_CREATE(_nested_func->create(get_nested_place(place)), this->data(place).~Data());
     }
 
     void destroy(AggregateDataPtr __restrict place) const noexcept override {
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 83a4586ab2..6ab756cc50 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -34,7 +34,7 @@
 namespace doris::vectorized {
 
 struct RowNumberData {
-    int64_t count;
+    int64_t count = 0;
 };
 
 class WindowFunctionRowNumber final
@@ -71,9 +71,9 @@ public:
 };
 
 struct RankData {
-    int64_t rank;
-    int64_t count;
-    int64_t peer_group_start;
+    int64_t rank = 0;
+    int64_t count = 0;
+    int64_t peer_group_start = 0;
 };
 
 class WindowFunctionRank final : public IAggregateFunctionDataHelper<RankData, WindowFunctionRank> {
@@ -116,8 +116,8 @@ public:
 };
 
 struct DenseRankData {
-    int64_t rank;
-    int64_t peer_group_start;
+    int64_t rank = 0;
+    int64_t peer_group_start = 0;
 };
 class WindowFunctionDenseRank final
         : public IAggregateFunctionDataHelper<DenseRankData, WindowFunctionDenseRank> {
@@ -157,8 +157,8 @@ public:
 };
 
 struct NTileData {
-    int64_t bucket_index;
-    int64_t rows;
+    int64_t bucket_index = 0;
+    int64_t rows = 0;
 };
 
 class WindowFunctionNTile final
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index c19a4b7730..91ddedd22a 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -603,7 +603,14 @@ Status AggregationNode::close(RuntimeState* state) {
 
 Status AggregationNode::_create_agg_status(AggregateDataPtr data) {
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]);
+        try {
+            _aggregate_evaluators[i]->create(data + _offsets_of_aggregate_states[i]);
+        } catch (...) {
+            for (int j = 0; j < i; ++j) {
+                _aggregate_evaluators[j]->destroy(data + _offsets_of_aggregate_states[j]);
+            }
+            throw;
+        }
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 34bcb7efed..90c2eaff5a 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -1217,13 +1217,15 @@ private:
                                 _deserialize_buffer.data(), (ColumnString*)(column.get()),
                                 _agg_arena_pool.get(), rows);
                     }
+
+                    DEFER({
+                        _aggregate_evaluators[i]->function()->destroy_vec(
+                                _deserialize_buffer.data(), rows);
+                    });
+
                     _aggregate_evaluators[i]->function()->merge_vec_selected(
                             _places.data(), _offsets_of_aggregate_states[i],
                             _deserialize_buffer.data(), _agg_arena_pool.get(), rows);
-
-                    _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
-                                                                      rows);
-
                 } else {
                     RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
                             block, _offsets_of_aggregate_states[i], _places.data(),
@@ -1257,13 +1259,15 @@ private:
                                 _deserialize_buffer.data(), (ColumnString*)(column.get()),
                                 _agg_arena_pool.get(), rows);
                     }
+
+                    DEFER({
+                        _aggregate_evaluators[i]->function()->destroy_vec(
+                                _deserialize_buffer.data(), rows);
+                    });
+
                     _aggregate_evaluators[i]->function()->merge_vec(
                             _places.data(), _offsets_of_aggregate_states[i],
                             _deserialize_buffer.data(), _agg_arena_pool.get(), rows);
-
-                    _aggregate_evaluators[i]->function()->destroy_vec(_deserialize_buffer.data(),
-                                                                      rows);
-
                 } else {
                     RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
                             block, _offsets_of_aggregate_states[i], _places.data(),
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 932baa370d..881928be42 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -712,13 +712,21 @@ Status VAnalyticEvalNode::_reset_agg_status() {
 
 Status VAnalyticEvalNode::_create_agg_status() {
     for (size_t i = 0; i < _agg_functions_size; ++i) {
-        _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]);
+        try {
+            _agg_functions[i]->create(_fn_place_ptr + _offsets_of_aggregate_states[i]);
+        } catch (...) {
+            for (int j = 0; j < i; ++j) {
+                _agg_functions[j]->destroy(_fn_place_ptr + _offsets_of_aggregate_states[j]);
+            }
+            throw;
+        }
     }
+    _agg_functions_created = true;
     return Status::OK();
 }
 
 Status VAnalyticEvalNode::_destroy_agg_status() {
-    if (UNLIKELY(_fn_place_ptr == nullptr)) {
+    if (UNLIKELY(_fn_place_ptr == nullptr || !_agg_functions_created)) {
         return Status::OK();
     }
     for (size_t i = 0; i < _agg_functions_size; ++i) {
diff --git a/be/src/vec/exec/vanalytic_eval_node.h b/be/src/vec/exec/vanalytic_eval_node.h
index ca7d723974..62a64a448d 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -142,6 +142,7 @@ private:
     int64_t _rows_start_offset = 0;
     int64_t _rows_end_offset = 0;
     size_t _agg_functions_size = 0;
+    bool _agg_functions_created = false;
 
     /// The offset of the n-th functions.
     std::vector<size_t> _offsets_of_aggregate_states;
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index a9dbcd6226..aed8eabe11 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -129,7 +129,10 @@ void BlockReader::_init_agg_state(const ReaderParams& read_params) {
         _agg_functions.push_back(function);
         // create aggregate data
         AggregateDataPtr place = new char[function->size_of_data()];
-        function->create(place);
+        SAFE_CREATE(function->create(place), {
+            _agg_functions.pop_back();
+            delete[] place;
+        });
         _agg_places.push_back(place);
 
         // calculate `has_string` tag.
@@ -438,8 +441,7 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
         if (is_close) {
             function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
             // reset aggregate data
-            function->destroy(place);
-            function->create(place);
+            function->reset(place);
         }
     }
 }
diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp
index 097ddb7513..e5ef2a403c 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -154,7 +154,10 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) {
         _agg_functions.push_back(function);
         // create aggregate data
         AggregateDataPtr place = new char[function->size_of_data()];
-        function->create(place);
+        SAFE_CREATE(function->create(place), {
+            _agg_functions.pop_back();
+            delete[] place;
+        });
         _agg_places.push_back(place);
 
         // calculate `has_string` tag.
@@ -271,8 +274,7 @@ void VerticalBlockReader::_update_agg_value(MutableColumns& columns, int begin,
         if (is_close) {
             function->insert_result_into(place, *columns[idx]);
             // reset aggregate data
-            function->destroy(place);
-            function->create(place);
+            function->reset(place);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org