You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/19 13:34:53 UTC
[doris] 27/36: [enhancement](exception) Column filter/replicate supports exception safety (#18503)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.0-alpha
in repository https://gitbox.apache.org/repos/asf/doris.git
commit fb1f54ddd50663659bbb5bf85918df8d7776f4e8
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Apr 18 19:23:09 2023 +0800
[enhancement](exception) Column filter/replicate supports exception safety (#18503)
---
be/src/common/status.h | 2 +
be/src/exec/base_scanner.cpp | 2 +
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 15 +++----
be/src/runtime/thread_context.h | 48 +++++++++-------------
be/src/vec/columns/column_array.cpp | 24 ++++-------
be/src/vec/columns/column_complex.h | 13 ++----
be/src/vec/columns/column_const.cpp | 15 ++-----
be/src/vec/columns/column_decimal.cpp | 12 ++----
be/src/vec/columns/column_dummy.h | 4 +-
be/src/vec/columns/column_string.cpp | 4 +-
be/src/vec/columns/column_vector.cpp | 14 ++-----
be/src/vec/columns/columns_common.cpp | 8 +---
be/src/vec/columns/columns_common.h | 18 ++++++++
be/src/vec/common/sort/heap_sorter.cpp | 3 +-
be/src/vec/common/sort/sorter.cpp | 10 ++---
be/src/vec/core/block.cpp | 4 +-
be/src/vec/core/block.h | 3 ++
be/src/vec/core/block_spill_writer.cpp | 6 +--
be/src/vec/core/sort_cursor.h | 1 +
.../exec/format/parquet/vparquet_group_reader.cpp | 18 ++++----
.../vec/exec/join/process_hash_table_probe_impl.h | 4 +-
be/src/vec/exec/join/vhash_join_node.cpp | 10 ++---
be/src/vec/exec/join/vnested_loop_join_node.cpp | 2 +
be/src/vec/exec/join/vnested_loop_join_node.h | 18 ++++----
be/src/vec/exec/vaggregation_node.cpp | 2 +-
be/src/vec/exec/varrow_scanner.cpp | 2 +-
be/src/vec/exprs/vexpr_context.cpp | 6 +--
.../vec/functions/array/function_array_apply.cpp | 8 +++-
.../array/function_array_with_constant.cpp | 3 +-
be/src/vec/sink/vtablet_sink.cpp | 3 +-
30 files changed, 128 insertions(+), 154 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index ec8fbba4fe..d989ba12c3 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -209,6 +209,8 @@ E(COLUMN_READ_STREAM, -1706);
E(COLUMN_STREAM_NOT_EXIST, -1716);
E(COLUMN_VALUE_NULL, -1717);
E(COLUMN_SEEK_ERROR, -1719);
+E(COLUMN_NO_MATCH_OFFSETS_SIZE, -1720);
+E(COLUMN_NO_MATCH_FILTER_SIZE, -1721);
E(DELETE_INVALID_CONDITION, -1900);
E(DELETE_UPDATE_HEADER_FAILED, -1901);
E(DELETE_SAVE_HEADER_FAILED, -1902);
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 043913a5f8..20633d2c48 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -194,6 +194,7 @@ Status BaseScanner::init_expr_ctxes() {
return Status::OK();
}
+// need exception safety
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
@@ -348,6 +349,7 @@ Status BaseScanner::_init_src_block() {
return Status::OK();
}
+// need exception safety
Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
*eof = _scanner_eof;
_fill_columns_from_path();
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 4d4aa4ae56..5d3c375f54 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1601,13 +1601,8 @@ Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_colu
}
Status SegmentIterator::next_batch(vectorized::Block* block) {
- Status st;
- try {
- st = _next_batch_internal(block);
- } catch (const doris::Exception& e) {
- st = Status::Error(e.code(), e.to_string());
- }
- return st;
+ RETURN_IF_CATCH_EXCEPTION({ return _next_batch_internal(block); });
+ return Status::OK();
}
Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
@@ -1882,7 +1877,8 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t&
}
selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
- vectorized::Block::filter_block_internal(block, _columns_to_filter, filter);
+ RETURN_IF_CATCH_EXCEPTION(
+ vectorized::Block::filter_block_internal(block, _columns_to_filter, filter));
} else if (auto* const_column =
vectorized::check_and_get_column<vectorized::ColumnConst>(*filter_column)) {
bool ret = const_column->get_bool(0);
@@ -1898,7 +1894,8 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t&
*filter_column)
.get_data();
selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
- vectorized::Block::filter_block_internal(block, _columns_to_filter, filter);
+ RETURN_IF_CATCH_EXCEPTION(
+ vectorized::Block::filter_block_internal(block, _columns_to_filter, filter));
}
return Status::OK();
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c86691a062..04e8915378 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -110,7 +110,7 @@ public:
};
inline thread_local ThreadContextPtr thread_context_ptr;
-inline thread_local bool enable_thread_catch_bad_alloc = false;
+inline thread_local int enable_thread_catch_bad_alloc = 0;
// To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS
// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
@@ -292,33 +292,22 @@ private:
tracker->transfer_to( \
size, doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_raw())
-// Consider catching other memory errors, such as memset failure, etc.
-#define RETURN_IF_CATCH_BAD_ALLOC(stmt) \
- do { \
- doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \
- if (doris::enable_thread_catch_bad_alloc) { \
- try { \
- { stmt; } \
- } catch (std::bad_alloc const& e) { \
- doris::thread_context()->thread_mem_tracker()->print_log_usage( \
- doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \
- return Status::MemoryLimitExceeded(fmt::format( \
- "PreCatch {}, {}", e.what(), \
- doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \
- } \
- } else { \
- try { \
- doris::enable_thread_catch_bad_alloc = true; \
- Defer defer {[&]() { doris::enable_thread_catch_bad_alloc = false; }}; \
- { stmt; } \
- } catch (std::bad_alloc const& e) { \
- doris::thread_context()->thread_mem_tracker()->print_log_usage( \
- doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \
- return Status::MemoryLimitExceeded(fmt::format( \
- "PreCatch {}, {}", e.what(), \
- doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \
- } \
- } \
+#define RETURN_IF_CATCH_EXCEPTION(stmt) \
+ do { \
+ try { \
+ doris::thread_context()->thread_mem_tracker_mgr->clear_exceed_mem_limit_msg(); \
+ doris::enable_thread_catch_bad_alloc++; \
+ Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }}; \
+ { stmt; } \
+ } catch (std::bad_alloc const& e) { \
+ doris::thread_context()->thread_mem_tracker()->print_log_usage( \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg()); \
+ return Status::MemoryLimitExceeded(fmt::format( \
+ "PreCatch {}, {}", e.what(), \
+ doris::thread_context()->thread_mem_tracker_mgr->exceed_mem_limit_msg())); \
+ } catch (const doris::Exception& e) { \
+ return Status::Error(e.code(), e.to_string()); \
+ } \
} while (0)
// Mem Hook to consume thread mem tracker
@@ -350,6 +339,7 @@ private:
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0
#define CONSUME_MEM_TRACKER(size) (void)0
#define RELEASE_MEM_TRACKER(size) (void)0
-#define RETURN_IF_CATCH_BAD_ALLOC(stmt) (stmt)
+#define RETURN_IF_CATCH_EXCEPTION(stmt) \
+ { stmt; }
#endif
} // namespace doris
diff --git a/be/src/vec/columns/column_array.cpp b/be/src/vec/columns/column_array.cpp
index 35f016d210..79b1aa7914 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -440,7 +440,7 @@ size_t ColumnArray::filter_number(const Filter& filter) {
ColumnPtr ColumnArray::filter_string(const Filter& filt, ssize_t result_size_hint) const {
size_t col_size = get_offsets().size();
- if (col_size != filt.size()) LOG(FATAL) << "Size of filter doesn't match size of column.";
+ column_match_filter_size(col_size, filt.size());
if (0 == col_size) return ColumnArray::create(data);
@@ -503,9 +503,7 @@ ColumnPtr ColumnArray::filter_string(const Filter& filt, ssize_t result_size_hin
size_t ColumnArray::filter_string(const Filter& filter) {
size_t col_size = get_offsets().size();
- if (col_size != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(col_size, filter.size());
if (0 == col_size) {
return ColumnArray::create(data);
@@ -567,7 +565,7 @@ size_t ColumnArray::filter_string(const Filter& filter) {
ColumnPtr ColumnArray::filter_generic(const Filter& filt, ssize_t result_size_hint) const {
size_t size = get_offsets().size();
- if (size != filt.size()) LOG(FATAL) << "Size of filter doesn't match size of column.";
+ column_match_filter_size(size, filt.size());
if (size == 0) return ColumnArray::create(data);
@@ -606,9 +604,7 @@ ColumnPtr ColumnArray::filter_generic(const Filter& filt, ssize_t result_size_hi
size_t ColumnArray::filter_generic(const Filter& filter) {
size_t size = get_offsets().size();
- if (size != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filter.size());
if (size == 0) {
return 0;
@@ -788,8 +784,7 @@ void ColumnArray::replicate(const uint32_t* counts, size_t target_size, IColumn&
template <typename T>
ColumnPtr ColumnArray::replicate_number(const IColumn::Offsets& replicate_offsets) const {
size_t col_size = size();
- if (col_size != replicate_offsets.size())
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
+ column_match_offsets_size(col_size, replicate_offsets.size());
MutableColumnPtr res = clone_empty();
@@ -836,8 +831,7 @@ ColumnPtr ColumnArray::replicate_number(const IColumn::Offsets& replicate_offset
ColumnPtr ColumnArray::replicate_string(const IColumn::Offsets& replicate_offsets) const {
size_t col_size = size();
- if (col_size != replicate_offsets.size())
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
+ column_match_offsets_size(col_size, replicate_offsets.size());
MutableColumnPtr res = clone_empty();
@@ -910,8 +904,7 @@ ColumnPtr ColumnArray::replicate_string(const IColumn::Offsets& replicate_offset
ColumnPtr ColumnArray::replicate_const(const IColumn::Offsets& replicate_offsets) const {
size_t col_size = size();
- if (col_size != replicate_offsets.size())
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
+ column_match_offsets_size(col_size, replicate_offsets.size());
if (0 == col_size) return clone_empty();
@@ -944,8 +937,7 @@ ColumnPtr ColumnArray::replicate_const(const IColumn::Offsets& replicate_offsets
ColumnPtr ColumnArray::replicate_generic(const IColumn::Offsets& replicate_offsets) const {
size_t col_size = size();
- if (col_size != replicate_offsets.size())
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
+ column_match_offsets_size(col_size, replicate_offsets.size());
MutableColumnPtr res = clone_empty();
ColumnArray& res_concrete = assert_cast<ColumnArray&>(*res);
diff --git a/be/src/vec/columns/column_complex.h b/be/src/vec/columns/column_complex.h
index 95a6793853..138f3d0fb3 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -29,6 +29,7 @@
#include "vec/columns/column_impl.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
+#include "vec/columns/columns_common.h"
#include "vec/core/types.h"
namespace doris::vectorized {
@@ -313,9 +314,7 @@ template <typename T>
ColumnPtr ColumnComplexType<T>::filter(const IColumn::Filter& filt,
ssize_t result_size_hint) const {
size_t size = data.size();
- if (size != filt.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filt.size());
if (data.size() == 0) return this->create();
auto res = this->create();
@@ -340,9 +339,7 @@ ColumnPtr ColumnComplexType<T>::filter(const IColumn::Filter& filt,
template <typename T>
size_t ColumnComplexType<T>::filter(const IColumn::Filter& filter) {
size_t size = data.size();
- if (size != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filter.size());
if (data.size() == 0) {
return 0;
@@ -391,9 +388,7 @@ ColumnPtr ColumnComplexType<T>::permute(const IColumn::Permutation& perm, size_t
template <typename T>
ColumnPtr ColumnComplexType<T>::replicate(const IColumn::Offsets& offsets) const {
size_t size = data.size();
- if (size != offsets.size()) {
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
- }
+ column_match_offsets_size(size, offsets.size());
if (0 == size) return this->create();
diff --git a/be/src/vec/columns/column_const.cpp b/be/src/vec/columns/column_const.cpp
index 96d0c013b1..0beec35355 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -55,19 +55,13 @@ ColumnPtr ColumnConst::remove_low_cardinality() const {
}
ColumnPtr ColumnConst::filter(const Filter& filt, ssize_t /*result_size_hint*/) const {
- if (s != filt.size()) {
- LOG(FATAL) << fmt::format("Size of filter ({}) doesn't match size of column ({})",
- filt.size(), s);
- }
+ column_match_filter_size(s, filt.size());
return ColumnConst::create(data, count_bytes_in_filter(filt));
}
size_t ColumnConst::filter(const Filter& filter) {
- if (s != filter.size()) {
- LOG(FATAL) << fmt::format("Size of filter ({}) doesn't match size of column ({})",
- filter.size(), s);
- }
+ column_match_filter_size(s, filter.size());
const auto result_size = count_bytes_in_filter(filter);
resize(result_size);
@@ -75,10 +69,7 @@ size_t ColumnConst::filter(const Filter& filter) {
}
ColumnPtr ColumnConst::replicate(const Offsets& offsets) const {
- if (s != offsets.size()) {
- LOG(FATAL) << fmt::format("Size of offsets ({}) doesn't match size of column ({})",
- offsets.size(), s);
- }
+ column_match_offsets_size(s, offsets.size());
size_t replicated_size = 0 == s ? 0 : offsets.back();
return ColumnConst::create(data, replicated_size);
diff --git a/be/src/vec/columns/column_decimal.cpp b/be/src/vec/columns/column_decimal.cpp
index 7731e05389..ab375d1d0d 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -276,9 +276,7 @@ void ColumnDecimal<T>::insert_range_from(const IColumn& src, size_t start, size_
template <typename T>
ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const {
size_t size = data.size();
- if (size != filt.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filt.size());
auto res = this->create(0, scale);
Container& res_data = res->get_data();
@@ -327,9 +325,7 @@ ColumnPtr ColumnDecimal<T>::filter(const IColumn::Filter& filt, ssize_t result_s
template <typename T>
size_t ColumnDecimal<T>::filter(const IColumn::Filter& filter) {
size_t size = data.size();
- if (size != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filter.size());
const UInt8* filter_pos = filter.data();
const UInt8* filter_end = filter_pos + size;
@@ -382,9 +378,7 @@ size_t ColumnDecimal<T>::filter(const IColumn::Filter& filter) {
template <typename T>
ColumnPtr ColumnDecimal<T>::replicate(const IColumn::Offsets& offsets) const {
size_t size = data.size();
- if (size != offsets.size()) {
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
- }
+ column_match_offsets_size(size, offsets.size());
auto res = this->create(0, scale);
if (0 == size) return res;
diff --git a/be/src/vec/columns/column_dummy.h b/be/src/vec/columns/column_dummy.h
index b66b284e3f..f6b36ae80a 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -108,9 +108,7 @@ public:
}
ColumnPtr replicate(const Offsets& offsets) const override {
- if (s != offsets.size()) {
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
- }
+ column_match_offsets_size(s, offsets.size());
return clone_dummy(offsets.back());
}
diff --git a/be/src/vec/columns/column_string.cpp b/be/src/vec/columns/column_string.cpp
index eff29d17a4..c6317e7453 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -377,9 +377,7 @@ void ColumnString::get_permutation(bool reverse, size_t limit, int /*nan_directi
ColumnPtr ColumnString::replicate(const Offsets& replicate_offsets) const {
size_t col_size = size();
- if (col_size != replicate_offsets.size()) {
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
- }
+ column_match_offsets_size(col_size, replicate_offsets.size());
auto res = ColumnString::create();
diff --git a/be/src/vec/columns/column_vector.cpp b/be/src/vec/columns/column_vector.cpp
index 4deaa46b69..45c8b94ddb 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -387,10 +387,7 @@ void ColumnVector<T>::insert_indices_from(const IColumn& src, const int* indices
template <typename T>
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_size_hint) const {
size_t size = data.size();
- if (size != filt.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column. data size: " << size
- << ", filter size: " << filt.size() << get_stack_trace();
- }
+ column_match_filter_size(size, filt.size());
auto res = this->create();
if constexpr (std::is_same_v<T, vectorized::Int64>) {
@@ -444,10 +441,7 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t result_si
template <typename T>
size_t ColumnVector<T>::filter(const IColumn::Filter& filter) {
size_t size = data.size();
- if (size != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column. data size: " << size
- << ", filter size: " << filter.size() << get_stack_trace();
- }
+ column_match_filter_size(size, filter.size());
const UInt8* filter_pos = filter.data();
const UInt8* filter_end = filter_pos + size;
@@ -523,9 +517,7 @@ ColumnPtr ColumnVector<T>::permute(const IColumn::Permutation& perm, size_t limi
template <typename T>
ColumnPtr ColumnVector<T>::replicate(const IColumn::Offsets& offsets) const {
size_t size = data.size();
- if (size != offsets.size()) {
- LOG(FATAL) << "Size of offsets doesn't match size of column.";
- }
+ column_match_offsets_size(size, offsets.size());
auto res = this->create();
if constexpr (std::is_same_v<T, vectorized::Int64>) {
diff --git a/be/src/vec/columns/columns_common.cpp b/be/src/vec/columns/columns_common.cpp
index 5665fd2998..4a4200a002 100644
--- a/be/src/vec/columns/columns_common.cpp
+++ b/be/src/vec/columns/columns_common.cpp
@@ -153,9 +153,7 @@ void filter_arrays_impl_generic(const PaddedPODArray<T>& src_elems,
PaddedPODArray<OT>* res_offsets, const IColumn::Filter& filt,
ssize_t result_size_hint) {
const size_t size = src_offsets.size();
- if (size != filt.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filt.size());
constexpr int ASSUME_STRING_LENGTH = 5;
ResultOffsetsBuilder result_offsets_builder(res_offsets);
@@ -233,9 +231,7 @@ size_t filter_arrays_impl_generic_without_reserving(PaddedPODArray<T>& elems,
PaddedPODArray<OT>& offsets,
const IColumn::Filter& filter) {
const size_t size = offsets.size();
- if (offsets.size() != filter.size()) {
- LOG(FATAL) << "Size of filter doesn't match size of column.";
- }
+ column_match_filter_size(size, filter.size());
/// If no need to filter the `offsets`, here do not reset the end ptr of `offsets`
if constexpr (!std::is_same_v<ResultOffsetsBuilder, NoResultOffsetsBuilder<OT>>) {
diff --git a/be/src/vec/columns/columns_common.h b/be/src/vec/columns/columns_common.h
index 3bae20f0ba..6102cef86b 100644
--- a/be/src/vec/columns/columns_common.h
+++ b/be/src/vec/columns/columns_common.h
@@ -59,6 +59,24 @@ template <typename T, typename OT>
size_t filter_arrays_impl_only_data(PaddedPODArray<T>& data, PaddedPODArray<OT>& offsets,
const IColumn::Filter& filter);
+inline void column_match_offsets_size(size_t size, size_t offsets_size) {
+ if (size != offsets_size) {
+ throw doris::Exception(
+ ErrorCode::COLUMN_NO_MATCH_OFFSETS_SIZE,
+ "Size of offsets doesn't match size of column: size={}, offsets.size={}", size,
+ offsets_size);
+ }
+}
+
+inline void column_match_filter_size(size_t size, size_t filter_size) {
+ if (size != filter_size) {
+ throw doris::Exception(
+ ErrorCode::COLUMN_NO_MATCH_FILTER_SIZE,
+ "Size of filter doesn't match size of column: size={}, filter.size={}", size,
+ filter_size);
+ }
+}
+
namespace detail {
template <typename T>
const PaddedPODArray<T>* get_indexes_data(const IColumn& indexes);
diff --git a/be/src/vec/common/sort/heap_sorter.cpp b/be/src/vec/common/sort/heap_sorter.cpp
index de36223915..406176ff51 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -74,7 +74,7 @@ Status HeapSorter::append_block(Block* block) {
if (_heap_size == _heap->size()) {
{
SCOPED_TIMER(_topn_filter_timer);
- _do_filter(block_view->value(), num_rows);
+ RETURN_IF_CATCH_EXCEPTION(_do_filter(block_view->value(), num_rows));
}
size_t remain_rows = block_view->value().block.rows();
_topn_filter_rows += (num_rows - remain_rows);
@@ -155,6 +155,7 @@ Field HeapSorter::get_top_value() {
return field;
}
+// need exception safety
void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t num_rows) {
const auto& top_cursor = _heap->top();
const int cursor_rid = top_cursor.row_id();
diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp
index b5315a1489..2e630fb44c 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -305,13 +305,9 @@ Status FullSorter::append_block(Block* block) {
DCHECK(data[i].type->equals(*(arrival_data[i].type)))
<< " type1: " << data[i].type->get_name()
<< " type2: " << arrival_data[i].type->get_name();
- try {
- //TODO: to eliminate unnecessary expansion, we need a `insert_range_from_const` for every column type.
- RETURN_IF_CATCH_BAD_ALLOC(data[i].column->assume_mutable()->insert_range_from(
- *arrival_data[i].column->convert_to_full_column_if_const(), 0, sz));
- } catch (const doris::Exception& e) {
- return Status::Error(e.code(), e.to_string());
- }
+ //TODO: to eliminate unnecessary expansion, we need a `insert_range_from_const` for every column type.
+ RETURN_IF_CATCH_EXCEPTION(data[i].column->assume_mutable()->insert_range_from(
+ *arrival_data[i].column->convert_to_full_column_if_const(), 0, sz));
}
block->clear_column_data();
}
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 0625718767..63b79d7965 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -738,7 +738,7 @@ Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to
for (size_t i = 0; i < size; ++i) {
filter_data[i] &= !null_map[i];
}
- filter_block_internal(block, columns_to_filter, filter);
+ RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
} else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
bool ret = const_column->get_bool(0);
if (!ret) {
@@ -750,7 +750,7 @@ Status Block::filter_block(Block* block, const std::vector<uint32_t>& columns_to
const IColumn::Filter& filter =
assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
.get_data();
- filter_block_internal(block, columns_to_filter, filter);
+ RETURN_IF_CATCH_EXCEPTION(filter_block_internal(block, columns_to_filter, filter));
}
erase_useless_column(block, column_to_keep);
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 36810edd28..dba2e62ea3 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -107,6 +107,7 @@ public:
ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; }
const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
+ // need exception safety
Status copy_column_data_to_block(doris::vectorized::IColumn* input_col_ptr,
uint16_t* sel_rowid_idx, uint16_t select_size, int block_cid,
size_t batch_size) {
@@ -264,9 +265,11 @@ public:
void append_block_by_selector(MutableBlock* dst, const IColumn::Selector& selector) const;
+ // need exception safety
static void filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter,
const IColumn::Filter& filter);
+ // need exception safety
static void filter_block_internal(Block* block, const IColumn::Filter& filter,
uint32_t column_to_keep);
diff --git a/be/src/vec/core/block_spill_writer.cpp b/be/src/vec/core/block_spill_writer.cpp
index b1cca3a851..a501253c63 100644
--- a/be/src/vec/core/block_spill_writer.cpp
+++ b/be/src/vec/core/block_spill_writer.cpp
@@ -87,14 +87,12 @@ Status BlockSpillWriter::write(const Block& block) {
auto& dst_data = tmp_block_.get_columns_with_type_and_name();
size_t block_rows = std::min(rows - row_idx, batch_size_);
- try {
+ RETURN_IF_CATCH_EXCEPTION({
for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx) {
dst_data[col_idx].column->assume_mutable()->insert_range_from(
*src_data[col_idx].column, row_idx, block_rows);
}
- } catch (const doris::Exception& e) {
- return Status::Error(e.code(), e.to_string());
- }
+ });
RETURN_IF_ERROR(_write_internal(tmp_block_));
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 01834f8345..aced2c022d 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -38,6 +38,7 @@ public:
_reset();
}
+ // need exception safety
void filter_block(IColumn::Filter& filter) {
Block::filter_block_internal(&block, filter, block.columns());
_reset();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0604106b03..29c2f488a5 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -272,11 +272,12 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
if (_position_delete_ctx.has_filter) {
filters.push_back(_pos_delete_filter_ptr.get());
}
- RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(_filter_conjuncts, filters, block,
- columns_to_filter, column_to_keep));
+ RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(
+ _filter_conjuncts, filters, block, columns_to_filter, column_to_keep)));
_convert_dict_cols_to_string_cols(block);
} else {
- RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter));
+ RETURN_IF_CATCH_EXCEPTION(
+ RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter)));
}
*read_rows = block->rows();
@@ -440,8 +441,8 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
// generated from next batch, so the filter column is removed ahead.
DCHECK_EQ(block->rows(), 0);
} else {
- RETURN_IF_ERROR(_filter_block_internal(block, _lazy_read_ctx.all_predicate_col_ids,
- result_filter));
+ RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_filter_block_internal(
+ block, _lazy_read_ctx.all_predicate_col_ids, result_filter)));
Block::erase_useless_column(block, origin_column_num);
}
} else {
@@ -633,6 +634,7 @@ Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
return Status::OK();
}
+// need exception safety
Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
const std::vector<uint32_t>& columns_to_filter) {
if (_pos_delete_filter_ptr) {
@@ -644,6 +646,7 @@ Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
return Status::OK();
}
+// need exception safety
Status RowGroupReader::_filter_block_internal(Block* block,
const std::vector<uint32_t>& columns_to_filter,
const IColumn::Filter& filter) {
@@ -734,8 +737,8 @@ Status RowGroupReader::_rewrite_dict_predicates() {
temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
}
std::vector<IColumn::Filter*> filters;
- RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(*ctxs, filters, &temp_block,
- columns_to_filter, column_to_keep));
+ RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_execute_conjuncts_and_filter_block(
+ *ctxs, filters, &temp_block, columns_to_filter, column_to_keep)));
if (dict_pos != 0) {
// We have to clean the first column to insert right data.
temp_block.get_by_position(0).column->assume_mutable()->clear();
@@ -978,6 +981,7 @@ Status RowGroupReader::_execute_conjuncts(const std::vector<VExprContext*>& ctxs
}
// TODO Performance Optimization
+// need exception safety
Status RowGroupReader::_execute_conjuncts_and_filter_block(
const std::vector<VExprContext*>& ctxs, const std::vector<IColumn::Filter*>& filters,
Block* block, std::vector<uint32_t>& columns_to_filter, int column_to_keep) {
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index ba49134c03..c5fbc34b01 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -392,7 +392,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
SCOPED_TIMER(_probe_side_output_timer);
- RETURN_IF_CATCH_BAD_ALLOC(
+ RETURN_IF_CATCH_EXCEPTION(
probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset,
last_probe_index, probe_size, all_match_one, false));
}
@@ -665,7 +665,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
{
SCOPED_TIMER(_probe_side_output_timer);
- RETURN_IF_CATCH_BAD_ALLOC(probe_side_output_column(
+ RETURN_IF_CATCH_EXCEPTION(probe_side_output_column(
mcol, _join_node->_left_output_slot_flags, current_offset, last_probe_index,
probe_size, all_match_one, true));
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 9c8a8a10f0..2dc9e09073 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -106,7 +106,7 @@ struct ProcessHashTableBuild {
// the hash table build bucket, which may waste a lot of memory.
// TODO, use the NDV expansion of the key column in the optimizer statistics
if (!_join_node->_build_unique) {
- RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(
+ RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table.expanse_for_add_elem(
std::min<int>(_rows, config::hash_table_pre_expanse_max_rows)));
}
@@ -479,7 +479,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
Status st;
if (_probe_index < _probe_block.rows()) {
DCHECK(_has_set_need_null_map_for_probe);
- try {
+ RETURN_IF_CATCH_EXCEPTION({
std::visit(
[&](auto&& arg, auto&& process_hashtable_ctx, auto need_null_map_for_probe,
auto ignore_null) {
@@ -516,9 +516,7 @@ Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block* output_
*_hash_table_variants, *_process_hashtable_ctx_variants,
make_bool_variant(_need_null_map_for_probe),
make_bool_variant(_probe_ignore_null));
- } catch (const doris::Exception& e) {
- return Status::Error(e.code(), e.to_string());
- }
+ });
} else if (_probe_eos) {
if (_is_right_semi_anti || (_is_outer_join && _join_op != TJoinOp::LEFT_OUTER_JOIN)) {
std::visit(
@@ -774,7 +772,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state, vectorized::Block* in_bloc
if (in_block->rows() != 0) {
SCOPED_TIMER(_build_side_merge_block_timer);
- RETURN_IF_CATCH_BAD_ALLOC(_build_side_mutable_block.merge(*in_block));
+ RETURN_IF_CATCH_EXCEPTION(_build_side_mutable_block.merge(*in_block));
}
if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index c995929ca1..ee7d0cdc51 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -488,6 +488,7 @@ void VNestedLoopJoinNode::_reset_with_next_probe_row() {
block->get_by_position(i).column->assume_mutable()->clear(); \
}
+// need exception safety
template <typename Filter, bool SetBuildSideFlag, bool SetProbeSideFlag>
void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
Block* block, int column_to_keep, int build_block_idx, int processed_blocks_num,
@@ -517,6 +518,7 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
}
}
+// need exception safety
template <bool SetBuildSideFlag, bool SetProbeSideFlag, bool IgnoreNull>
Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block, bool materialize) {
auto column_to_keep = block->columns();
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h
index 84f5f18d69..537ed2d7f0 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -105,10 +105,11 @@ private:
}
if constexpr (set_probe_side_flag) {
- auto status =
- _do_filtering_and_update_visited_flags<set_build_side_flag,
- set_probe_side_flag, ignore_null>(
- &_join_block, !_is_left_semi_anti);
+ Status status;
+ RETURN_IF_CATCH_EXCEPTION(
+ (status = _do_filtering_and_update_visited_flags<
+ set_build_side_flag, set_probe_side_flag, ignore_null>(
+ &_join_block, !_is_left_semi_anti)));
_update_additional_flags(&_join_block);
if (!status.ok()) {
return status;
@@ -141,10 +142,11 @@ private:
}
if constexpr (!set_probe_side_flag) {
- Status status =
- _do_filtering_and_update_visited_flags<set_build_side_flag, set_probe_side_flag,
- ignore_null>(&_join_block,
- !_is_right_semi_anti);
+ Status status;
+ RETURN_IF_CATCH_EXCEPTION(
+ (status = _do_filtering_and_update_visited_flags<
+ set_build_side_flag, set_probe_side_flag, ignore_null>(
+ &_join_block, !_is_right_semi_anti)));
_update_additional_flags(&_join_block);
mutable_join_block = MutableBlock(&_join_block);
if (!status.ok()) {
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 77a0d0e7cb..c19a4b7730 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -1115,7 +1115,7 @@ Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
_agg_data->_aggregated_method_variant));
if (!ret_flag) {
- RETURN_IF_CATCH_BAD_ALLOC(_emplace_into_hash_table(_places.data(), key_columns, rows));
+ RETURN_IF_CATCH_EXCEPTION(_emplace_into_hash_table(_places.data(), key_columns, rows));
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_batch_add(
diff --git a/be/src/vec/exec/varrow_scanner.cpp b/be/src/vec/exec/varrow_scanner.cpp
index 9e3137f7c6..eae0bb0deb 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -251,7 +251,7 @@ Status VArrowScanner::get_next(vectorized::Block* block, bool* eof) {
RETURN_IF_ERROR(_cast_src_block(&_src_block));
// materialize, src block => dest columns
- return _fill_dest_block(block, eof);
+ RETURN_IF_CATCH_EXCEPTION({ return _fill_dest_block(block, eof); });
}
// arrow type ==arrow_column_to_doris_column==> primitive type(PT0) ==cast_src_block==>
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 7d881e5ef2..ab8ee1d696 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -38,12 +38,10 @@ VExprContext::~VExprContext() {
doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) {
Status st;
- try {
+ RETURN_IF_CATCH_EXCEPTION({
st = _root->execute(this, block, result_column_id);
_last_result_column_id = *result_column_id;
- } catch (const doris::Exception& e) {
- st = Status::Error(e.code(), e.to_string());
- }
+ });
return st;
}
diff --git a/be/src/vec/functions/array/function_array_apply.cpp b/be/src/vec/functions/array/function_array_apply.cpp
index bb81f04fda..a47a28f281 100644
--- a/be/src/vec/functions/array/function_array_apply.cpp
+++ b/be/src/vec/functions/array/function_array_apply.cpp
@@ -68,8 +68,9 @@ public:
const ColumnConst& rhs_value_column =
static_cast<const ColumnConst&>(*block.get_by_position(arguments[2]).column.get());
ColumnPtr result_ptr;
- RETURN_IF_ERROR(_execute(*src_nested_column, nested_type, src_offsets, condition,
- rhs_value_column, &result_ptr));
+ RETURN_IF_CATCH_EXCEPTION(
+ RETURN_IF_ERROR(_execute(*src_nested_column, nested_type, src_offsets, condition,
+ rhs_value_column, &result_ptr)));
block.replace_by_position(result, std::move(result_ptr));
return Status::OK();
}
@@ -107,6 +108,7 @@ private:
__builtin_unreachable();
}
+ // need exception safety
template <typename T, ApplyOp op>
ColumnPtr _apply_internal(const IColumn& src_column, const ColumnArray::Offsets64& src_offsets,
const ColumnConst& cmp) {
@@ -144,6 +146,7 @@ private:
return ColumnArray::create(filtered, std::move(column_offsets));
}
+// need exception safety
#define APPLY_ALL_TYPES(src_column, src_offsets, OP, cmp, dst) \
do { \
WhichDataType which(remove_nullable(nested_type)); \
@@ -186,6 +189,7 @@ private:
} \
} while (0)
+ // need exception safety
Status _execute(const IColumn& nested_src, DataTypePtr nested_type,
const ColumnArray::Offsets64& offsets, const std::string& condition,
const ColumnConst& rhs_value_column, ColumnPtr* dst) {
diff --git a/be/src/vec/functions/array/function_array_with_constant.cpp b/be/src/vec/functions/array/function_array_with_constant.cpp
index 5ee91b97a3..bb8e789525 100644
--- a/be/src/vec/functions/array/function_array_with_constant.cpp
+++ b/be/src/vec/functions/array/function_array_with_constant.cpp
@@ -78,7 +78,8 @@ public:
}
auto clone = value->clone_empty();
clone->reserve(input_rows_count);
- value->replicate(array_sizes.data(), offset, *clone->assume_mutable().get());
+ RETURN_IF_CATCH_EXCEPTION(
+ value->replicate(array_sizes.data(), offset, *clone->assume_mutable().get()));
if (!clone->is_nullable()) {
clone = ColumnNullable::create(std::move(clone), ColumnUInt8::create(clone->size(), 0));
}
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index ad3796477d..d1bcf7fcca 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -1201,7 +1201,8 @@ Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block,
for (size_t i = 0; i < filter_col.size(); ++i) {
filter_data[i] = !_filter_bitmap.Get(i);
}
- vectorized::Block::filter_block_internal(&block, filter_col, block.columns());
+ RETURN_IF_CATCH_EXCEPTION(
+ vectorized::Block::filter_block_internal(&block, filter_col, block.columns()));
}
}
// Add block to node channel
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org