You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 13:49:51 UTC

[doris] 03/05: [Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 12faf8a99ded291deae17e9cdb5c6ff1104b5f9a
Author: Pxl <px...@qq.com>
AuthorDate: Mon Jan 16 11:21:28 2023 +0800

    [Bug](function)  catch function calculation error on aggregate node to avoid core dump (#15903)
---
 be/src/vec/exec/vaggregation_node.cpp              | 36 ++++++++++----------
 be/src/vec/exec/vaggregation_node.h                | 35 ++++++++++++--------
 be/src/vec/exprs/vectorized_agg_fn.cpp             | 38 +++++++++++++---------
 be/src/vec/exprs/vectorized_agg_fn.h               | 20 ++++++------
 .../bitmap_functions/test_bitmap_function.out      |  3 ++
 .../bitmap_functions/test_bitmap_function.groovy   | 16 +++++++++
 6 files changed, 91 insertions(+), 57 deletions(-)

diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 9056dee8ca..e93fc89591 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -81,12 +81,11 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
                                  const DescriptorTbl& descs)
         : ExecNode(pool, tnode, descs),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
-          _intermediate_tuple_desc(NULL),
+          _intermediate_tuple_desc(nullptr),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
-          _output_tuple_desc(NULL),
+          _output_tuple_desc(nullptr),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_merge(false),
-          _agg_data(),
           _build_timer(nullptr),
           _serialize_key_timer(nullptr),
           _exec_timer(nullptr),
@@ -671,9 +670,9 @@ Status AggregationNode::_execute_without_key(Block* block) {
     DCHECK(_agg_data->without_key != nullptr);
     SCOPED_TIMER(_build_timer);
     for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-        _aggregate_evaluators[i]->execute_single_add(
+        RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
                 block, _agg_data->without_key + _offsets_of_aggregate_states[i],
-                _agg_arena_pool.get());
+                _agg_arena_pool.get()));
     }
     return Status::OK();
 }
@@ -706,9 +705,9 @@ Status AggregationNode::_merge_without_key(Block* block) {
                 }
             }
         } else {
-            _aggregate_evaluators[i]->execute_single_add(
+            RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
                     block, _agg_data->without_key + _offsets_of_aggregate_states[i],
-                    _agg_arena_pool.get());
+                    _agg_arena_pool.get()));
         }
     }
     return Status::OK();
@@ -974,8 +973,8 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
     // to avoid wasting memory.
     // But for fixed hash map, it never need to expand
     bool ret_flag = false;
-    std::visit(
-            [&](auto&& agg_method) -> void {
+    RETURN_IF_ERROR(std::visit(
+            [&](auto&& agg_method) -> Status {
                 if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) {
                     // do not try to do agg, just init and serialize directly return the out_block
                     if (!_should_expand_preagg_hash_tables()) {
@@ -1007,8 +1006,10 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                             for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
                                 SCOPED_TIMER(_serialize_data_timer);
-                                _aggregate_evaluators[i]->streaming_agg_serialize_to_column(
-                                        in_block, value_columns[i], rows, _agg_arena_pool.get());
+                                RETURN_IF_ERROR(
+                                        _aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+                                                in_block, value_columns[i], rows,
+                                                _agg_arena_pool.get()));
                             }
                         } else {
                             std::vector<VectorBufferWriter> value_buffer_writers;
@@ -1031,9 +1032,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
 
                             for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
                                 SCOPED_TIMER(_serialize_data_timer);
-                                _aggregate_evaluators[i]->streaming_agg_serialize(
+                                RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize(
                                         in_block, value_buffer_writers[i], rows,
-                                        _agg_arena_pool.get());
+                                        _agg_arena_pool.get()));
                             }
                         }
 
@@ -1059,16 +1060,17 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
                         }
                     }
                 }
+                return Status::OK();
             },
-            _agg_data->_aggregated_method_variant);
+            _agg_data->_aggregated_method_variant));
 
     if (!ret_flag) {
         RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows));
 
         for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-            _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],
-                                                        _places.data(), _agg_arena_pool.get(),
-                                                        _should_expand_hash_table);
+            RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
+                    in_block, _offsets_of_aggregate_states[i], _places.data(),
+                    _agg_arena_pool.get(), _should_expand_hash_table));
         }
     }
 
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 23203380ff..7a636227d6 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -106,12 +106,16 @@ struct AggregationMethodSerialized {
     static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns,
                                         const Sizes&) {
         auto pos = key.data;
-        for (auto& column : key_columns) pos = column->deserialize_and_insert_from_arena(pos);
+        for (auto& column : key_columns) {
+            pos = column->deserialize_and_insert_from_arena(pos);
+        }
     }
 
     static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns,
                                          const size_t num_rows, const Sizes&) {
-        for (auto& column : key_columns) column->deserialize_vec(keys, num_rows);
+        for (auto& column : key_columns) {
+            column->deserialize_vec(keys, num_rows);
+        }
     }
 
     void init_once() {
@@ -260,7 +264,7 @@ struct AggregationMethodKeysFixed {
     Iterator iterator;
     bool inited = false;
 
-    AggregationMethodKeysFixed() {}
+    AggregationMethodKeysFixed() = default;
 
     template <typename Other>
     AggregationMethodKeysFixed(const Other& other) : data(other.data) {}
@@ -282,7 +286,9 @@ struct AggregationMethodKeysFixed {
             ColumnUInt8* null_map;
 
             bool column_nullable = false;
-            if constexpr (has_nullable_keys) column_nullable = is_column_nullable(*key_columns[i]);
+            if constexpr (has_nullable_keys) {
+                column_nullable = is_column_nullable(*key_columns[i]);
+            }
 
             /// If we have a nullable column, get its nested column and its null map.
             if (column_nullable) {
@@ -305,9 +311,9 @@ struct AggregationMethodKeysFixed {
                 is_null = val == 1;
             }
 
-            if (has_nullable_keys && is_null)
+            if (has_nullable_keys && is_null) {
                 observed_column->insert_default();
-            else {
+            } else {
                 size_t size = key_sizes[i];
                 observed_column->insert_data(reinterpret_cast<const char*>(&key) + pos, size);
                 pos += size;
@@ -901,16 +907,17 @@ private:
             _find_in_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-                _aggregate_evaluators[i]->execute_batch_add_selected(
+                RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
                         block, _offsets_of_aggregate_states[i], _places.data(),
-                        _agg_arena_pool.get());
+                        _agg_arena_pool.get()));
             }
         } else {
             _emplace_into_hash_table(_places.data(), key_columns, rows);
 
             for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
-                _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i],
-                                                            _places.data(), _agg_arena_pool.get());
+                RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
+                        block, _offsets_of_aggregate_states[i], _places.data(),
+                        _agg_arena_pool.get()));
             }
 
             if (_should_limit_output) {
@@ -985,9 +992,9 @@ private:
                                                                       rows);
 
                 } else {
-                    _aggregate_evaluators[i]->execute_batch_add_selected(
+                    RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
                             block, _offsets_of_aggregate_states[i], _places.data(),
-                            _agg_arena_pool.get());
+                            _agg_arena_pool.get()));
                 }
             }
         } else {
@@ -1025,9 +1032,9 @@ private:
                                                                       rows);
 
                 } else {
-                    _aggregate_evaluators[i]->execute_batch_add(
+                    RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
                             block, _offsets_of_aggregate_states[i], _places.data(),
-                            _agg_arena_pool.get());
+                            _agg_arena_pool.get()));
                 }
             }
 
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 0895eae775..101717d4cd 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -154,38 +154,43 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {
     _function->destroy(place);
 }
 
-void AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
-    _calc_argment_columns(block);
+Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
+    RETURN_IF_ERROR(_calc_argment_columns(block));
     SCOPED_TIMER(_exec_timer);
     _function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena);
+    return Status::OK();
 }
 
-void AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
-                                       Arena* arena, bool agg_many) {
-    _calc_argment_columns(block);
+Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
+                                         Arena* arena, bool agg_many) {
+    RETURN_IF_ERROR(_calc_argment_columns(block));
     SCOPED_TIMER(_exec_timer);
     _function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many);
+    return Status::OK();
 }
 
-void AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
-                                                AggregateDataPtr* places, Arena* arena) {
-    _calc_argment_columns(block);
+Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
+                                                  AggregateDataPtr* places, Arena* arena) {
+    RETURN_IF_ERROR(_calc_argment_columns(block));
     SCOPED_TIMER(_exec_timer);
     _function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena);
+    return Status::OK();
 }
 
-void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
-                                             const size_t num_rows, Arena* arena) {
-    _calc_argment_columns(block);
+Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
+                                               const size_t num_rows, Arena* arena) {
+    RETURN_IF_ERROR(_calc_argment_columns(block));
     SCOPED_TIMER(_exec_timer);
     _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena);
+    return Status::OK();
 }
 
-void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
-                                                       const size_t num_rows, Arena* arena) {
-    _calc_argment_columns(block);
+Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+                                                         const size_t num_rows, Arena* arena) {
+    RETURN_IF_ERROR(_calc_argment_columns(block));
     SCOPED_TIMER(_exec_timer);
     _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena);
+    return Status::OK();
 }
 
 void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) {
@@ -220,19 +225,20 @@ std::string AggFnEvaluator::debug_string() const {
     return out.str();
 }
 
-void AggFnEvaluator::_calc_argment_columns(Block* block) {
+Status AggFnEvaluator::_calc_argment_columns(Block* block) {
     SCOPED_TIMER(_expr_timer);
     _agg_columns.resize(_input_exprs_ctxs.size());
     int column_ids[_input_exprs_ctxs.size()];
     for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
         int column_id = -1;
-        _input_exprs_ctxs[i]->execute(block, &column_id);
+        RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, &column_id));
         column_ids[i] = column_id;
     }
     materialize_block_inplace(*block, column_ids, column_ids + _input_exprs_ctxs.size());
     for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
         _agg_columns[i] = block->get_by_position(column_ids[i]).column.get();
     }
+    return Status::OK();
 }
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h
index fa025edec1..1fe6214654 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -53,19 +53,19 @@ public:
     void destroy(AggregateDataPtr place);
 
     // agg_function
-    void execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr);
+    Status execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr);
 
-    void execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
-                           Arena* arena = nullptr, bool agg_many = false);
+    Status execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
+                             Arena* arena = nullptr, bool agg_many = false);
 
-    void execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
-                                    Arena* arena = nullptr);
+    Status execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
+                                      Arena* arena = nullptr);
 
-    void streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows,
-                                 Arena* arena);
+    Status streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows,
+                                   Arena* arena);
 
-    void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
-                                           const size_t num_rows, Arena* arena);
+    Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+                                             const size_t num_rows, Arena* arena);
 
     void insert_result_info(AggregateDataPtr place, IColumn* column);
 
@@ -89,7 +89,7 @@ private:
 
     AggFnEvaluator(const TExprNode& desc);
 
-    void _calc_argment_columns(Block* block);
+    Status _calc_argment_columns(Block* block);
 
     DataTypes _argument_types_with_sort;
     DataTypes _real_argument_types;
diff --git a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
index ed771ea325..d7b68d929b 100644
--- a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
+++ b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
@@ -271,3 +271,6 @@ false
 -- !sql --
 20221103
 
+-- !sql --
+\N
+
diff --git a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
index 8669f961a4..9a61f874fc 100644
--- a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
+++ b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
@@ -191,4 +191,20 @@ suite("test_bitmap_function") {
     qt_sql """ select bitmap_to_string(sub_bitmap(bitmap_from_string('1'), 0, 3)) value;  """
     qt_sql """ select bitmap_to_string(bitmap_subset_limit(bitmap_from_string('100'), 0, 3)) value;  """
     qt_sql """ select bitmap_to_string(bitmap_subset_in_range(bitmap_from_string('20221103'), 0, 20221104)) date_list_bitmap;  """
+
+    sql "drop table if exists d_table;"
+    sql """
+        create table d_table (
+            k1 int null,
+            k2 int not null,
+            k3 bigint null,
+            k4 varchar(100) null
+        )
+        duplicate key (k1,k2,k3)
+        distributed BY hash(k1) buckets 3
+        properties("replication_num" = "1");
+    """
+    sql "insert into d_table select -4,-4,-4,'d';"
+    try_sql "select bitmap_union(to_bitmap_with_check(k2)) from d_table;"
+    qt_sql "select bitmap_union(to_bitmap(k2)) from d_table;"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org