You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 16:17:57 UTC

[doris] 05/13: [improvement](exception-safe) create and prepare node/sink support exception safe (#20551)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 99615e31cbaf1fd8ca3df501c6bf9d800a50eb00
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Jun 9 21:06:59 2023 +0800

    [improvement](exception-safe) create and prepare node/sink support exception safe (#20551)
---
 be/src/common/exception.h                          | 22 ++++++++++
 be/src/exec/data_sink.cpp                          | 49 ++++++++--------------
 be/src/olap/rowset/segment_v2/page_io.cpp          |  2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |  2 +-
 be/src/runtime/plan_fragment_executor.cpp          | 13 +++---
 be/src/vec/core/block.cpp                          |  4 +-
 be/src/vec/data_types/data_type_decimal.cpp        | 11 +++--
 be/src/vec/data_types/data_type_time_v2.cpp        |  4 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  4 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  4 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             | 13 ++----
 be/src/vec/exec/vsort_node.cpp                     |  4 +-
 12 files changed, 70 insertions(+), 62 deletions(-)

diff --git a/be/src/common/exception.h b/be/src/common/exception.h
index 55eef571bc..3166d83844 100644
--- a/be/src/common/exception.h
+++ b/be/src/common/exception.h
@@ -92,3 +92,25 @@ inline std::string Exception::to_string() const {
             }                                                                                    \
         }                                                                                        \
     } while (0)
+
+#define RETURN_IF_ERROR_OR_CATCH_EXCEPTION(stmt)                                                 \
+    do {                                                                                         \
+        try {                                                                                    \
+            doris::enable_thread_catch_bad_alloc++;                                              \
+            Defer defer {[&]() { doris::enable_thread_catch_bad_alloc--; }};                     \
+            {                                                                                    \
+                Status _status_ = (stmt);                                                        \
+                if (UNLIKELY(!_status_.ok())) {                                                  \
+                    return _status_;                                                             \
+                }                                                                                \
+            }                                                                                    \
+        } catch (const doris::Exception& e) {                                                    \
+            if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {                                \
+                return Status::MemoryLimitExceeded(fmt::format(                                  \
+                        "PreCatch error code:{}, {}, __FILE__:{}, __LINE__:{}, __FUNCTION__:{}", \
+                        e.code(), e.to_string(), __FILE__, __LINE__, __PRETTY_FUNCTION__));      \
+            } else {                                                                             \
+                return Status::Error(e.code(), e.to_string());                                   \
+            }                                                                                    \
+        }                                                                                        \
+    } while (0)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index b0d75067e0..c5d5446611 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -50,8 +50,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                                   const TPlanFragmentExecParams& params,
                                   const RowDescriptor& row_desc, RuntimeState* state,
                                   std::unique_ptr<DataSink>* sink, DescriptorTbl& desc_tbl) {
-    DataSink* tmp_sink = nullptr;
-
     switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK: {
         if (!thrift_sink.__isset.stream_sink) {
@@ -62,11 +60,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                         ? params.send_query_statistics_with_every_batch
                         : false;
         // TODO: figure out good buffer size based on size of output row
-        tmp_sink = new vectorized::VDataStreamSender(
+        sink->reset(new vectorized::VDataStreamSender(
                 state, pool, params.sender_id, row_desc, thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, send_query_statistics_with_every_batch);
+                params.destinations, 16 * 1024, send_query_statistics_with_every_batch));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
-        sink->reset(tmp_sink);
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -75,9 +72,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                      thrift_sink.result_sink, 4096);
-        sink->reset(tmp_sink);
+        sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
+                                                       thrift_sink.result_sink, 4096));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
@@ -92,17 +88,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                         : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
-            tmp_sink = new doris::vectorized::VResultFileSink(
+            sink->reset(new doris::vectorized::VResultFileSink(
                     pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
                     params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl);
+                    output_exprs, desc_tbl));
         } else {
-            tmp_sink = new doris::vectorized::VResultFileSink(
+            sink->reset(new doris::vectorized::VResultFileSink(
                     pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
-                    send_query_statistics_with_every_batch, output_exprs);
+                    send_query_statistics_with_every_batch, output_exprs));
         }
-
-        sink->reset(tmp_sink);
         break;
     }
     case TDataSinkType::MEMORY_SCRATCH_SINK: {
@@ -110,8 +104,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
 
-        tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs);
-        sink->reset(tmp_sink);
+        sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
         break;
     }
     case TDataSinkType::MYSQL_TABLE_SINK: {
@@ -193,7 +186,6 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                                   const size_t& local_param_idx, const RowDescriptor& row_desc,
                                   RuntimeState* state, std::unique_ptr<DataSink>* sink,
                                   DescriptorTbl& desc_tbl) {
-    DataSink* tmp_sink = nullptr;
     const auto& local_params = params.local_params[local_param_idx];
     switch (thrift_sink.type) {
     case TDataSinkType::DATA_STREAM_SINK: {
@@ -205,11 +197,10 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                         ? params.send_query_statistics_with_every_batch
                         : false;
         // TODO: figure out good buffer size based on size of output row
-        tmp_sink = new vectorized::VDataStreamSender(
+        sink->reset(new vectorized::VDataStreamSender(
                 state, pool, local_params.sender_id, row_desc, thrift_sink.stream_sink,
-                params.destinations, 16 * 1024, send_query_statistics_with_every_batch);
+                params.destinations, 16 * 1024, send_query_statistics_with_every_batch));
         // RETURN_IF_ERROR(sender->prepare(state->obj_pool(), thrift_sink.stream_sink));
-        sink->reset(tmp_sink);
         break;
     }
     case TDataSinkType::RESULT_SINK: {
@@ -218,9 +209,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         }
 
         // TODO: figure out good buffer size based on size of output row
-        tmp_sink = new doris::vectorized::VResultSink(row_desc, output_exprs,
-                                                      thrift_sink.result_sink, 4096);
-        sink->reset(tmp_sink);
+        sink->reset(new doris::vectorized::VResultSink(row_desc, output_exprs,
+                                                       thrift_sink.result_sink, 4096));
         break;
     }
     case TDataSinkType::RESULT_FILE_SINK: {
@@ -235,17 +225,15 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                         : false;
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
-            tmp_sink = new doris::vectorized::VResultFileSink(
+            sink->reset(new doris::vectorized::VResultFileSink(
                     pool, local_params.sender_id, row_desc, thrift_sink.result_file_sink,
                     params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
-                    output_exprs, desc_tbl);
+                    output_exprs, desc_tbl));
         } else {
-            tmp_sink = new doris::vectorized::VResultFileSink(
+            sink->reset(new doris::vectorized::VResultFileSink(
                     pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
-                    send_query_statistics_with_every_batch, output_exprs);
+                    send_query_statistics_with_every_batch, output_exprs));
         }
-
-        sink->reset(tmp_sink);
         break;
     }
     case TDataSinkType::MEMORY_SCRATCH_SINK: {
@@ -253,8 +241,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
             return Status::InternalError("Missing data buffer sink.");
         }
 
-        tmp_sink = new vectorized::MemoryScratchSink(row_desc, output_exprs);
-        sink->reset(tmp_sink);
+        sink->reset(new vectorized::MemoryScratchSink(row_desc, output_exprs));
         break;
     }
     case TDataSinkType::MYSQL_TABLE_SINK: {
diff --git a/be/src/olap/rowset/segment_v2/page_io.cpp b/be/src/olap/rowset/segment_v2/page_io.cpp
index 2f27375fe5..3016542372 100644
--- a/be/src/olap/rowset/segment_v2/page_io.cpp
+++ b/be/src/olap/rowset/segment_v2/page_io.cpp
@@ -51,7 +51,7 @@ Status PageIO::compress_page_body(BlockCompressionCodec* codec, double min_space
     size_t uncompressed_size = Slice::compute_total_size(body);
     if (codec != nullptr && !codec->exceed_max_compress_len(uncompressed_size)) {
         faststring buf;
-        RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress(body, uncompressed_size, &buf)));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(body, uncompressed_size, &buf));
         double space_saving = 1.0 - static_cast<double>(buf.size()) / uncompressed_size;
         // return compressed body only when it saves more than min_space_saving
         if (space_saving > 0 && space_saving >= min_space_saving) {
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp
index ebb30fe384..edf0dc20cd 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -309,7 +309,7 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re
     _runtime_state->set_num_per_fragment_instances(request.num_senders);
 
     if (request.fragment.__isset.output_sink) {
-        RETURN_IF_ERROR(DataSink::create_data_sink(
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
                 _runtime_state->obj_pool(), request.fragment.output_sink,
                 request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
                 _runtime_state.get(), &_sink, *desc_tbl));
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 079719927b..49b1afdc50 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -160,8 +160,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
 
     // set up plan
     DCHECK(request.__isset.fragment);
-    RETURN_IF_ERROR(ExecNode::create_tree(_runtime_state.get(), obj_pool(), request.fragment.plan,
-                                          *desc_tbl, &_plan));
+    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ExecNode::create_tree(
+            _runtime_state.get(), obj_pool(), request.fragment.plan, *desc_tbl, &_plan));
 
     // set #senders of exchange nodes before calling Prepare()
     std::vector<ExecNode*> exch_nodes;
@@ -173,6 +173,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
         static_cast<doris::vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders);
     }
 
+    // TODO Is it exception safe?
     RETURN_IF_ERROR(_plan->prepare(_runtime_state.get()));
     // set scan ranges
     std::vector<ExecNode*> scan_nodes;
@@ -211,10 +212,10 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
 
     // set up sink, if required
     if (request.fragment.__isset.output_sink) {
-        RETURN_IF_ERROR(DataSink::create_data_sink(obj_pool(), request.fragment.output_sink,
-                                                   request.fragment.output_exprs, params,
-                                                   row_desc(), runtime_state(), &_sink, *desc_tbl));
-        RETURN_IF_ERROR(_sink->prepare(runtime_state()));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
+                obj_pool(), request.fragment.output_sink, request.fragment.output_exprs, params,
+                row_desc(), runtime_state(), &_sink, *desc_tbl));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sink->prepare(runtime_state()));
 
         RuntimeProfile* sink_profile = _sink->profile();
         if (sink_profile != nullptr) {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 2422ca9b16..caa5d2acd5 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -828,8 +828,8 @@ Status Block::serialize(int be_exec_version, PBlock* pblock,
         RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec));
 
         faststring buf_compressed;
-        RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(codec->compress(
-                Slice(column_values.data(), content_uncompressed_size), &buf_compressed)));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress(
+                Slice(column_values.data(), content_uncompressed_size), &buf_compressed));
         size_t compressed_size = buf_compressed.size();
         if (LIKELY(compressed_size < content_uncompressed_size)) {
             pblock->set_column_values(buf_compressed.data(), buf_compressed.size());
diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp
index c3bafba8aa..cf31bd1e40 100644
--- a/be/src/vec/data_types/data_type_decimal.cpp
+++ b/be/src/vec/data_types/data_type_decimal.cpp
@@ -164,13 +164,16 @@ bool DataTypeDecimal<T>::parse_from_string(const std::string& str, T* res) const
 DataTypePtr create_decimal(UInt64 precision_value, UInt64 scale_value, bool use_v2) {
     if (precision_value < min_decimal_precision() ||
         precision_value > max_decimal_precision<Decimal128>()) {
-        LOG(WARNING) << "Wrong precision " << precision_value;
-        return nullptr;
+        throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR,
+                               "Wrong precision {}, min: {}, max: {}", precision_value,
+                               min_decimal_precision(), max_decimal_precision<Decimal128>());
     }
 
     if (static_cast<UInt64>(scale_value) > precision_value) {
-        LOG(WARNING) << "Negative scales and scales larger than precision are not supported";
-        return nullptr;
+        throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR,
+                               "Negative scales and scales larger than precision are not "
+                               "supported, scale_value: {}, precision_value: {}",
+                               scale_value, precision_value);
     }
 
     if (use_v2) {
diff --git a/be/src/vec/data_types/data_type_time_v2.cpp b/be/src/vec/data_types/data_type_time_v2.cpp
index 8baf8a6776..e5b537ca34 100644
--- a/be/src/vec/data_types/data_type_time_v2.cpp
+++ b/be/src/vec/data_types/data_type_time_v2.cpp
@@ -202,8 +202,8 @@ void DataTypeDateTimeV2::cast_to_date_v2(const UInt64 from, UInt32& to) {
 
 DataTypePtr create_datetimev2(UInt64 scale_value) {
     if (scale_value > 6) {
-        LOG(WARNING) << "Wrong scale " << scale_value;
-        return nullptr;
+        throw doris::Exception(doris::ErrorCode::NOT_IMPLEMENTED_ERROR, "scale_value > 6 {}",
+                               scale_value);
     }
     return std::make_shared<DataTypeDateTimeV2>(scale_value);
 }
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 945cf09087..c3d80b6422 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -1302,8 +1302,8 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
     for (auto& conjunct : _lazy_read_ctx.conjuncts) {
         filter_conjuncts.push_back(conjunct);
     }
-    RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts(
-            filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all)));
+    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
+            filter_conjuncts, nullptr, block, _filter.get(), &can_filter_all));
 
     if (_lazy_read_ctx.resize_first_column) {
         block->get_by_position(0).column->assume_mutable()->clear();
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index b50dc1d0e0..06dd72eb7b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -785,8 +785,8 @@ Status RowGroupReader::_rewrite_dict_predicates() {
             // The following process may be tricky and time-consuming, but we have no other way.
             temp_block.get_by_position(0).column->assume_mutable()->resize(dict_value_column_size);
         }
-        RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
-                ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep)));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts_and_filter_block(
+                ctxs, nullptr, &temp_block, columns_to_filter, column_to_keep));
         if (dict_pos != 0) {
             // We have to clean the first column to insert right data.
             temp_block.get_by_position(0).column->assume_mutable()->clear();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index b0faa7f799..43fe98ea04 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -309,16 +309,11 @@ Status VFileScanner::_init_src_block(Block* block) {
         auto it = _name_to_col_type.find(slot->col_name());
         if (it == _name_to_col_type.end() || _is_dynamic_schema) {
             // not exist in file, using type from _input_tuple_desc
-            data_type =
-                    DataTypeFactory::instance().create_data_type(slot->type(), slot->is_nullable());
+            RETURN_IF_CATCH_EXCEPTION(data_type = DataTypeFactory::instance().create_data_type(
+                                              slot->type(), slot->is_nullable()));
         } else {
-            data_type = DataTypeFactory::instance().create_data_type(it->second, true);
-        }
-        if (data_type == nullptr) {
-            return Status::NotSupported("Not support data type {} for column {}",
-                                        it == _name_to_col_type.end() ? slot->type().debug_string()
-                                                                      : it->second.debug_string(),
-                                        slot->col_name());
+            RETURN_IF_CATCH_EXCEPTION(
+                    data_type = DataTypeFactory::instance().create_data_type(it->second, true));
         }
         MutableColumnPtr data_column = data_type->create_column();
         _src_block.insert(
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 05b94b733a..b7ad692413 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -179,7 +179,7 @@ Status VSortNode::open(RuntimeState* state) {
                                   ExecNode::get_next,
                           _children[0], std::placeholders::_1, std::placeholders::_2,
                           std::placeholders::_3)));
-        RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(sink(state, upstream_block.get(), eos)));
+        RETURN_IF_ERROR_OR_CATCH_EXCEPTION(sink(state, upstream_block.get(), eos));
     } while (!eos);
 
     child(0)->close(state);
@@ -191,7 +191,7 @@ Status VSortNode::open(RuntimeState* state) {
 }
 
 Status VSortNode::pull(doris::RuntimeState* state, vectorized::Block* output_block, bool* eos) {
-    RETURN_IF_CATCH_EXCEPTION(RETURN_IF_ERROR(_sorter->get_next(state, output_block, eos)));
+    RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_sorter->get_next(state, output_block, eos));
     reached_limit(output_block, eos);
     if (*eos) {
         _runtime_profile->add_info_string("Spilled", _sorter->is_spilled() ? "true" : "false");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org