You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 13:49:51 UTC
[doris] 03/05: [Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 12faf8a99ded291deae17e9cdb5c6ff1104b5f9a
Author: Pxl <px...@qq.com>
AuthorDate: Mon Jan 16 11:21:28 2023 +0800
[Bug](function) catch function calculation error on aggregate node to avoid core dump (#15903)
---
be/src/vec/exec/vaggregation_node.cpp | 36 ++++++++++----------
be/src/vec/exec/vaggregation_node.h | 35 ++++++++++++--------
be/src/vec/exprs/vectorized_agg_fn.cpp | 38 +++++++++++++---------
be/src/vec/exprs/vectorized_agg_fn.h | 20 ++++++------
.../bitmap_functions/test_bitmap_function.out | 3 ++
.../bitmap_functions/test_bitmap_function.groovy | 16 +++++++++
6 files changed, 91 insertions(+), 57 deletions(-)
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 9056dee8ca..e93fc89591 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -81,12 +81,11 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
- _intermediate_tuple_desc(NULL),
+ _intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
- _output_tuple_desc(NULL),
+ _output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_merge(false),
- _agg_data(),
_build_timer(nullptr),
_serialize_key_timer(nullptr),
_exec_timer(nullptr),
@@ -671,9 +670,9 @@ Status AggregationNode::_execute_without_key(Block* block) {
DCHECK(_agg_data->without_key != nullptr);
SCOPED_TIMER(_build_timer);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->execute_single_add(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key + _offsets_of_aggregate_states[i],
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
return Status::OK();
}
@@ -706,9 +705,9 @@ Status AggregationNode::_merge_without_key(Block* block) {
}
}
} else {
- _aggregate_evaluators[i]->execute_single_add(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key + _offsets_of_aggregate_states[i],
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
}
return Status::OK();
@@ -974,8 +973,8 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// to avoid wasting memory.
// But for fixed hash map, it never need to expand
bool ret_flag = false;
- std::visit(
- [&](auto&& agg_method) -> void {
+ RETURN_IF_ERROR(std::visit(
+ [&](auto&& agg_method) -> Status {
if (auto& hash_tbl = agg_method.data; hash_tbl.add_elem_size_overflow(rows)) {
// do not try to do agg, just init and serialize directly return the out_block
if (!_should_expand_preagg_hash_tables()) {
@@ -1007,8 +1006,10 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
- _aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i], rows, _agg_arena_pool.get());
+ RETURN_IF_ERROR(
+ _aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+ in_block, value_columns[i], rows,
+ _agg_arena_pool.get()));
}
} else {
std::vector<VectorBufferWriter> value_buffer_writers;
@@ -1031,9 +1032,9 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
SCOPED_TIMER(_serialize_data_timer);
- _aggregate_evaluators[i]->streaming_agg_serialize(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize(
in_block, value_buffer_writers[i], rows,
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
}
@@ -1059,16 +1060,17 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
}
}
+ return Status::OK();
},
- _agg_data->_aggregated_method_variant);
+ _agg_data->_aggregated_method_variant));
if (!ret_flag) {
RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->execute_batch_add(in_block, _offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool.get(),
- _should_expand_hash_table);
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
+ in_block, _offsets_of_aggregate_states[i], _places.data(),
+ _agg_arena_pool.get(), _should_expand_hash_table));
}
}
diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h
index 23203380ff..7a636227d6 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -106,12 +106,16 @@ struct AggregationMethodSerialized {
static void insert_key_into_columns(const StringRef& key, MutableColumns& key_columns,
const Sizes&) {
auto pos = key.data;
- for (auto& column : key_columns) pos = column->deserialize_and_insert_from_arena(pos);
+ for (auto& column : key_columns) {
+ pos = column->deserialize_and_insert_from_arena(pos);
+ }
}
static void insert_keys_into_columns(std::vector<StringRef>& keys, MutableColumns& key_columns,
const size_t num_rows, const Sizes&) {
- for (auto& column : key_columns) column->deserialize_vec(keys, num_rows);
+ for (auto& column : key_columns) {
+ column->deserialize_vec(keys, num_rows);
+ }
}
void init_once() {
@@ -260,7 +264,7 @@ struct AggregationMethodKeysFixed {
Iterator iterator;
bool inited = false;
- AggregationMethodKeysFixed() {}
+ AggregationMethodKeysFixed() = default;
template <typename Other>
AggregationMethodKeysFixed(const Other& other) : data(other.data) {}
@@ -282,7 +286,9 @@ struct AggregationMethodKeysFixed {
ColumnUInt8* null_map;
bool column_nullable = false;
- if constexpr (has_nullable_keys) column_nullable = is_column_nullable(*key_columns[i]);
+ if constexpr (has_nullable_keys) {
+ column_nullable = is_column_nullable(*key_columns[i]);
+ }
/// If we have a nullable column, get its nested column and its null map.
if (column_nullable) {
@@ -305,9 +311,9 @@ struct AggregationMethodKeysFixed {
is_null = val == 1;
}
- if (has_nullable_keys && is_null)
+ if (has_nullable_keys && is_null) {
observed_column->insert_default();
- else {
+ } else {
size_t size = key_sizes[i];
observed_column->insert_data(reinterpret_cast<const char*>(&key) + pos, size);
pos += size;
@@ -901,16 +907,17 @@ private:
_find_in_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->execute_batch_add_selected(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i], _places.data(),
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
} else {
_emplace_into_hash_table(_places.data(), key_columns, rows);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->execute_batch_add(block, _offsets_of_aggregate_states[i],
- _places.data(), _agg_arena_pool.get());
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
+ block, _offsets_of_aggregate_states[i], _places.data(),
+ _agg_arena_pool.get()));
}
if (_should_limit_output) {
@@ -985,9 +992,9 @@ private:
rows);
} else {
- _aggregate_evaluators[i]->execute_batch_add_selected(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i], _places.data(),
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
}
} else {
@@ -1025,9 +1032,9 @@ private:
rows);
} else {
- _aggregate_evaluators[i]->execute_batch_add(
+ RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
block, _offsets_of_aggregate_states[i], _places.data(),
- _agg_arena_pool.get());
+ _agg_arena_pool.get()));
}
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 0895eae775..101717d4cd 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -154,38 +154,43 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {
_function->destroy(place);
}
-void AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
- _calc_argment_columns(block);
+Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
+ RETURN_IF_ERROR(_calc_argment_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena);
+ return Status::OK();
}
-void AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
- Arena* arena, bool agg_many) {
- _calc_argment_columns(block);
+Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
+ Arena* arena, bool agg_many) {
+ RETURN_IF_ERROR(_calc_argment_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many);
+ return Status::OK();
}
-void AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
- AggregateDataPtr* places, Arena* arena) {
- _calc_argment_columns(block);
+Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
+ AggregateDataPtr* places, Arena* arena) {
+ RETURN_IF_ERROR(_calc_argment_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena);
+ return Status::OK();
}
-void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
- const size_t num_rows, Arena* arena) {
- _calc_argment_columns(block);
+Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
+ const size_t num_rows, Arena* arena) {
+ RETURN_IF_ERROR(_calc_argment_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena);
+ return Status::OK();
}
-void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
- const size_t num_rows, Arena* arena) {
- _calc_argment_columns(block);
+Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+ const size_t num_rows, Arena* arena) {
+ RETURN_IF_ERROR(_calc_argment_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena);
+ return Status::OK();
}
void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn* column) {
@@ -220,19 +225,20 @@ std::string AggFnEvaluator::debug_string() const {
return out.str();
}
-void AggFnEvaluator::_calc_argment_columns(Block* block) {
+Status AggFnEvaluator::_calc_argment_columns(Block* block) {
SCOPED_TIMER(_expr_timer);
_agg_columns.resize(_input_exprs_ctxs.size());
int column_ids[_input_exprs_ctxs.size()];
for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
int column_id = -1;
- _input_exprs_ctxs[i]->execute(block, &column_id);
+ RETURN_IF_ERROR(_input_exprs_ctxs[i]->execute(block, &column_id));
column_ids[i] = column_id;
}
materialize_block_inplace(*block, column_ids, column_ids + _input_exprs_ctxs.size());
for (int i = 0; i < _input_exprs_ctxs.size(); ++i) {
_agg_columns[i] = block->get_by_position(column_ids[i]).column.get();
}
+ return Status::OK();
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h
index fa025edec1..1fe6214654 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -53,19 +53,19 @@ public:
void destroy(AggregateDataPtr place);
// agg_function
- void execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr);
+ Status execute_single_add(Block* block, AggregateDataPtr place, Arena* arena = nullptr);
- void execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
- Arena* arena = nullptr, bool agg_many = false);
+ Status execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
+ Arena* arena = nullptr, bool agg_many = false);
- void execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
- Arena* arena = nullptr);
+ Status execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places,
+ Arena* arena = nullptr);
- void streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows,
- Arena* arena);
+ Status streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows,
+ Arena* arena);
- void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
- const size_t num_rows, Arena* arena);
+ Status streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+ const size_t num_rows, Arena* arena);
void insert_result_info(AggregateDataPtr place, IColumn* column);
@@ -89,7 +89,7 @@ private:
AggFnEvaluator(const TExprNode& desc);
- void _calc_argment_columns(Block* block);
+ Status _calc_argment_columns(Block* block);
DataTypes _argument_types_with_sort;
DataTypes _real_argument_types;
diff --git a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
index ed771ea325..d7b68d929b 100644
--- a/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
+++ b/regression-test/data/query_p0/sql_functions/bitmap_functions/test_bitmap_function.out
@@ -271,3 +271,6 @@ false
-- !sql --
20221103
+-- !sql --
+\N
+
diff --git a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
index 8669f961a4..9a61f874fc 100644
--- a/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
+++ b/regression-test/suites/query_p0/sql_functions/bitmap_functions/test_bitmap_function.groovy
@@ -191,4 +191,20 @@ suite("test_bitmap_function") {
qt_sql """ select bitmap_to_string(sub_bitmap(bitmap_from_string('1'), 0, 3)) value; """
qt_sql """ select bitmap_to_string(bitmap_subset_limit(bitmap_from_string('100'), 0, 3)) value; """
qt_sql """ select bitmap_to_string(bitmap_subset_in_range(bitmap_from_string('20221103'), 0, 20221104)) date_list_bitmap; """
+
+ sql "drop table if exists d_table;"
+ sql """
+ create table d_table (
+ k1 int null,
+ k2 int not null,
+ k3 bigint null,
+ k4 varchar(100) null
+ )
+ duplicate key (k1,k2,k3)
+ distributed BY hash(k1) buckets 3
+ properties("replication_num" = "1");
+ """
+ sql "insert into d_table select -4,-4,-4,'d';"
+ try_sql "select bitmap_union(to_bitmap_with_check(k2)) from d_table;"
+ qt_sql "select bitmap_union(to_bitmap(k2)) from d_table;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org