You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/02 15:12:29 UTC

[doris] 02/03: [bugfix]fix core dump on outfile with expr (#10491)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dcb66e084fa58a9c6ce797f961dfac2fbc430fc3
Author: Pxl <95...@qq.com>
AuthorDate: Wed Jun 29 20:38:49 2022 +0800

    [bugfix]fix core dump on outfile with expr (#10491)
    
    remove log
---
 be/src/vec/runtime/vfile_result_writer.cpp | 34 +++++++++++++++++++-----------
 be/src/vec/runtime/vfile_result_writer.h   | 14 ++++++------
 be/src/vec/sink/vresult_file_sink.cpp      | 13 ++++++------
 be/src/vec/sink/vresult_file_sink.h        | 18 ++++++++--------
 4 files changed, 45 insertions(+), 34 deletions(-)

diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
index 71a748e565..6d4ecb8db1 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/runtime/vfile_result_writer.h"
 
+#include "common/status.h"
 #include "exprs/expr_context.h"
 #include "gutil/strings/numbers.h"
 #include "gutil/strings/substitute.h"
@@ -35,22 +36,23 @@
 #include "util/mysql_global.h"
 #include "util/mysql_row_buffer.h"
 #include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
 const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
 using doris::operator<<;
 
-VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
-                                     const TStorageBackendType::type storage_type,
-                                     const TUniqueId fragment_instance_id,
-                                     const std::vector<ExprContext*>& output_expr_ctxs,
-                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
-                                     Block* output_block, bool output_object_data,
-                                     const RowDescriptor& output_row_descriptor)
+VFileResultWriter::VFileResultWriter(
+        const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type,
+        const TUniqueId fragment_instance_id,
+        const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
+        RuntimeProfile* parent_profile, BufferControlBlock* sinker, Block* output_block,
+        bool output_object_data, const RowDescriptor& output_row_descriptor)
         : _file_opts(file_opts),
           _storage_type(storage_type),
           _fragment_instance_id(fragment_instance_id),
-          _output_expr_ctxs(output_expr_ctxs),
+          _output_vexpr_ctxs(output_vexpr_ctxs),
           _parent_profile(parent_profile),
           _sinker(sinker),
           _output_block(output_block),
@@ -196,7 +198,16 @@ Status VFileResultWriter::append_block(Block& block) {
     if (_parquet_writer != nullptr) {
         return Status::NotSupported("Parquet Writer is not supported yet!");
     } else {
-        RETURN_IF_ERROR(_write_csv_file(block));
+        Status status = Status::OK();
+        // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+        // failed, just return the error status
+        auto output_block = VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
+                                                                               block, status);
+        auto num_rows = output_block.rows();
+        if (UNLIKELY(num_rows == 0)) {
+            return status;
+        }
+        RETURN_IF_ERROR(_write_csv_file(output_block));
     }
 
     _written_rows += block.rows();
@@ -210,7 +221,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) {
             if (col.column->is_null_at(i)) {
                 _plain_text_outstream << NULL_IN_CSV;
             } else {
-                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
                 case TYPE_BOOLEAN:
                 case TYPE_TINYINT:
                     _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
@@ -280,8 +291,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) {
                             reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
                                     ->value);
                     std::string decimal_str;
-                    int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
-                    decimal_str = decimal_val.to_string(output_scale);
+                    decimal_str = decimal_val.to_string();
                     _plain_text_outstream << decimal_str;
                     break;
                 }
diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h
index b7fd2cd737..5f0bb7971e 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -30,22 +30,22 @@ public:
     VFileResultWriter(const ResultFileOptions* file_option,
                       const TStorageBackendType::type storage_type,
                       const TUniqueId fragment_instance_id,
-                      const std::vector<ExprContext*>& output_expr_ctxs,
+                      const std::vector<VExprContext*>& _output_vexpr_ctxs,
                       RuntimeProfile* parent_profile, BufferControlBlock* sinker,
                       Block* output_block, bool output_object_data,
                       const RowDescriptor& output_row_descriptor);
     virtual ~VFileResultWriter() = default;
 
-    virtual Status append_block(Block& block) override;
-    virtual Status append_row_batch(const RowBatch* batch) override {
+    Status append_block(Block& block) override;
+    Status append_row_batch(const RowBatch* batch) override {
         return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
     };
 
-    virtual Status init(RuntimeState* state) override;
-    virtual Status close() override;
+    Status init(RuntimeState* state) override;
+    Status close() override;
 
     // file result writer always return statistic result in one row
-    virtual int64_t get_written_rows() const override { return 1; }
+    int64_t get_written_rows() const override { return 1; }
 
 private:
     Status _write_csv_file(const Block& block);
@@ -77,7 +77,7 @@ private:
     const ResultFileOptions* _file_opts;
     TStorageBackendType::type _storage_type;
     TUniqueId _fragment_instance_id;
-    const std::vector<ExprContext*>& _output_expr_ctxs;
+    const std::vector<VExprContext*>& _output_vexpr_ctxs;
 
     // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
     // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index 6d8d994585..b939332e39 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -77,9 +77,10 @@ Status VResultFileSink::init(const TDataSink& tsink) {
 
 Status VResultFileSink::prepare_exprs(RuntimeState* state) {
     // From the thrift expressions create the real exprs.
-    RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+    RETURN_IF_ERROR(
+            VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_vexpr_ctxs));
     // Prepare the exprs to run.
-    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+    RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc, _expr_mem_tracker));
     return Status::OK();
 }
 
@@ -100,7 +101,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
                 state->fragment_instance_id(), _buf_size, &_sender));
         // create writer
         _writer.reset(new (std::nothrow) VFileResultWriter(
-                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
                 _profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
                 _output_row_descriptor));
     } else {
@@ -115,7 +116,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
         // create writer
         _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
         _writer.reset(new (std::nothrow) VFileResultWriter(
-                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
                 _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
                 _output_row_descriptor));
     }
@@ -127,7 +128,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
 }
 
 Status VResultFileSink::open(RuntimeState* state) {
-    return Expr::open(_output_expr_ctxs, state);
+    return VExpr::open(_output_vexpr_ctxs, state);
 }
 
 Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
@@ -186,7 +187,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
         _output_block->clear();
     }
 
-    Expr::close(_output_expr_ctxs, state);
+    VExpr::close(_output_vexpr_ctxs, state);
 
     _closed = true;
     return Status::OK();
diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h
index e924883b42..f550fa3585 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -34,18 +34,18 @@ public:
                     const std::vector<TPlanFragmentDestination>& destinations,
                     int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
-    virtual ~VResultFileSink() = default;
-    virtual Status init(const TDataSink& thrift_sink) override;
-    virtual Status prepare(RuntimeState* state) override;
-    virtual Status open(RuntimeState* state) override;
+    ~VResultFileSink() override = default;
+    Status init(const TDataSink& thrift_sink) override;
+    Status prepare(RuntimeState* state) override;
+    Status open(RuntimeState* state) override;
     // send data in 'batch' to this backend stream mgr
     // Blocks until all rows in batch are placed in the buffer
-    virtual Status send(RuntimeState* state, RowBatch* batch) override;
-    virtual Status send(RuntimeState* state, Block* block) override;
+    Status send(RuntimeState* state, RowBatch* batch) override;
+    Status send(RuntimeState* state, Block* block) override;
     // Flush all buffered data and close all existing channels to destination
     // hosts. Further send() calls are illegal after calling close().
-    virtual Status close(RuntimeState* state, Status exec_status) override;
-    virtual RuntimeProfile* profile() override { return _profile; }
+    Status close(RuntimeState* state, Status exec_status) override;
+    RuntimeProfile* profile() override { return _profile; }
 
     void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
 
@@ -57,7 +57,7 @@ private:
 
     // Owned by the RuntimeState.
     const std::vector<TExpr>& _t_output_expr;
-    std::vector<ExprContext*> _output_expr_ctxs;
+    std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
     RowDescriptor _output_row_descriptor;
 
     std::unique_ptr<Block> _output_block = nullptr;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org