You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/07/02 15:12:29 UTC
[doris] 02/03: [bugfix]fix core dump on outfile with expr (#10491)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/doris.git
commit dcb66e084fa58a9c6ce797f961dfac2fbc430fc3
Author: Pxl <95...@qq.com>
AuthorDate: Wed Jun 29 20:38:49 2022 +0800
[bugfix]fix core dump on outfile with expr (#10491)
remove log
---
be/src/vec/runtime/vfile_result_writer.cpp | 34 +++++++++++++++++++-----------
be/src/vec/runtime/vfile_result_writer.h | 14 ++++++------
be/src/vec/sink/vresult_file_sink.cpp | 13 ++++++------
be/src/vec/sink/vresult_file_sink.h | 18 ++++++++--------
4 files changed, 45 insertions(+), 34 deletions(-)
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
index 71a748e565..6d4ecb8db1 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -17,6 +17,7 @@
#include "vec/runtime/vfile_result_writer.h"
+#include "common/status.h"
#include "exprs/expr_context.h"
#include "gutil/strings/numbers.h"
#include "gutil/strings/substitute.h"
@@ -35,22 +36,23 @@
#include "util/mysql_global.h"
#include "util/mysql_row_buffer.h"
#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
using doris::operator<<;
-VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
- const TStorageBackendType::type storage_type,
- const TUniqueId fragment_instance_id,
- const std::vector<ExprContext*>& output_expr_ctxs,
- RuntimeProfile* parent_profile, BufferControlBlock* sinker,
- Block* output_block, bool output_object_data,
- const RowDescriptor& output_row_descriptor)
+VFileResultWriter::VFileResultWriter(
+ const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type,
+ const TUniqueId fragment_instance_id,
+ const std::vector<vectorized::VExprContext*>& output_vexpr_ctxs,
+ RuntimeProfile* parent_profile, BufferControlBlock* sinker, Block* output_block,
+ bool output_object_data, const RowDescriptor& output_row_descriptor)
: _file_opts(file_opts),
_storage_type(storage_type),
_fragment_instance_id(fragment_instance_id),
- _output_expr_ctxs(output_expr_ctxs),
+ _output_vexpr_ctxs(output_vexpr_ctxs),
_parent_profile(parent_profile),
_sinker(sinker),
_output_block(output_block),
@@ -196,7 +198,16 @@ Status VFileResultWriter::append_block(Block& block) {
if (_parquet_writer != nullptr) {
return Status::NotSupported("Parquet Writer is not supported yet!");
} else {
- RETURN_IF_ERROR(_write_csv_file(block));
+ Status status = Status::OK();
+ // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
+ // failed, just return the error status
+ auto output_block = VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
+ block, status);
+ auto num_rows = output_block.rows();
+ if (UNLIKELY(num_rows == 0)) {
+ return status;
+ }
+ RETURN_IF_ERROR(_write_csv_file(output_block));
}
_written_rows += block.rows();
@@ -210,7 +221,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) {
if (col.column->is_null_at(i)) {
_plain_text_outstream << NULL_IN_CSV;
} else {
- switch (_output_expr_ctxs[col_id]->root()->type().type) {
+ switch (_output_vexpr_ctxs[col_id]->root()->type().type) {
case TYPE_BOOLEAN:
case TYPE_TINYINT:
_plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
@@ -280,8 +291,7 @@ Status VFileResultWriter::_write_csv_file(const Block& block) {
reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
->value);
std::string decimal_str;
- int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
- decimal_str = decimal_val.to_string(output_scale);
+ decimal_str = decimal_val.to_string();
_plain_text_outstream << decimal_str;
break;
}
diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h
index b7fd2cd737..5f0bb7971e 100644
--- a/be/src/vec/runtime/vfile_result_writer.h
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -30,22 +30,22 @@ public:
VFileResultWriter(const ResultFileOptions* file_option,
const TStorageBackendType::type storage_type,
const TUniqueId fragment_instance_id,
- const std::vector<ExprContext*>& output_expr_ctxs,
+ const std::vector<VExprContext*>& _output_vexpr_ctxs,
RuntimeProfile* parent_profile, BufferControlBlock* sinker,
Block* output_block, bool output_object_data,
const RowDescriptor& output_row_descriptor);
virtual ~VFileResultWriter() = default;
- virtual Status append_block(Block& block) override;
- virtual Status append_row_batch(const RowBatch* batch) override {
+ Status append_block(Block& block) override;
+ Status append_row_batch(const RowBatch* batch) override {
return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
};
- virtual Status init(RuntimeState* state) override;
- virtual Status close() override;
+ Status init(RuntimeState* state) override;
+ Status close() override;
// file result writer always return statistic result in one row
- virtual int64_t get_written_rows() const override { return 1; }
+ int64_t get_written_rows() const override { return 1; }
private:
Status _write_csv_file(const Block& block);
@@ -77,7 +77,7 @@ private:
const ResultFileOptions* _file_opts;
TStorageBackendType::type _storage_type;
TUniqueId _fragment_instance_id;
- const std::vector<ExprContext*>& _output_expr_ctxs;
+ const std::vector<VExprContext*>& _output_vexpr_ctxs;
// If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
// If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
index 6d8d994585..b939332e39 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -77,9 +77,10 @@ Status VResultFileSink::init(const TDataSink& tsink) {
Status VResultFileSink::prepare_exprs(RuntimeState* state) {
// From the thrift expressions create the real exprs.
- RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+ RETURN_IF_ERROR(
+ VExpr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_vexpr_ctxs));
// Prepare the exprs to run.
- RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+ RETURN_IF_ERROR(VExpr::prepare(_output_vexpr_ctxs, state, _row_desc, _expr_mem_tracker));
return Status::OK();
}
@@ -100,7 +101,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
state->fragment_instance_id(), _buf_size, &_sender));
// create writer
_writer.reset(new (std::nothrow) VFileResultWriter(
- _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+ _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
_profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
_output_row_descriptor));
} else {
@@ -115,7 +116,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
// create writer
_output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
_writer.reset(new (std::nothrow) VFileResultWriter(
- _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+ _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_vexpr_ctxs,
_profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
_output_row_descriptor));
}
@@ -127,7 +128,7 @@ Status VResultFileSink::prepare(RuntimeState* state) {
}
Status VResultFileSink::open(RuntimeState* state) {
- return Expr::open(_output_expr_ctxs, state);
+ return VExpr::open(_output_vexpr_ctxs, state);
}
Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
@@ -186,7 +187,7 @@ Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
_output_block->clear();
}
- Expr::close(_output_expr_ctxs, state);
+ VExpr::close(_output_vexpr_ctxs, state);
_closed = true;
return Status::OK();
diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h
index e924883b42..f550fa3585 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -34,18 +34,18 @@ public:
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
- virtual ~VResultFileSink() = default;
- virtual Status init(const TDataSink& thrift_sink) override;
- virtual Status prepare(RuntimeState* state) override;
- virtual Status open(RuntimeState* state) override;
+ ~VResultFileSink() override = default;
+ Status init(const TDataSink& thrift_sink) override;
+ Status prepare(RuntimeState* state) override;
+ Status open(RuntimeState* state) override;
// send data in 'batch' to this backend stream mgr
// Blocks until all rows in batch are placed in the buffer
- virtual Status send(RuntimeState* state, RowBatch* batch) override;
- virtual Status send(RuntimeState* state, Block* block) override;
+ Status send(RuntimeState* state, RowBatch* batch) override;
+ Status send(RuntimeState* state, Block* block) override;
// Flush all buffered data and close all existing channels to destination
// hosts. Further send() calls are illegal after calling close().
- virtual Status close(RuntimeState* state, Status exec_status) override;
- virtual RuntimeProfile* profile() override { return _profile; }
+ Status close(RuntimeState* state, Status exec_status) override;
+ RuntimeProfile* profile() override { return _profile; }
void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
@@ -57,7 +57,7 @@ private:
// Owned by the RuntimeState.
const std::vector<TExpr>& _t_output_expr;
- std::vector<ExprContext*> _output_expr_ctxs;
+ std::vector<vectorized::VExprContext*> _output_vexpr_ctxs;
RowDescriptor _output_row_descriptor;
std::unique_ptr<Block> _output_block = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org