You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/11 15:17:41 UTC
[incubator-doris] 01/02: [feature](vectorized) Support outfile on vectorized engine (#10013)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 8cdccfb01a5ddd12d8ecef6f4870e8bb6235257d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Jun 10 09:15:53 2022 +0800
[feature](vectorized) Support outfile on vectorized engine (#10013)
This PR supports output csv format file on vectorized engine.
** Parquet is still not supported. **
---
be/src/exec/data_sink.cpp | 33 +-
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/runtime/vfile_result_writer.cpp | 468 +++++++++++++++++++++++++++++
be/src/vec/runtime/vfile_result_writer.h | 124 ++++++++
be/src/vec/sink/vdata_stream_sender.cpp | 35 +++
be/src/vec/sink/vdata_stream_sender.h | 13 +-
be/src/vec/sink/vresult_file_sink.cpp | 203 +++++++++++++
be/src/vec/sink/vresult_file_sink.h | 70 +++++
8 files changed, 939 insertions(+), 9 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index a0ef6336e5..a11556310a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -38,6 +38,7 @@
#include "vec/sink/result_sink.h"
#include "vec/sink/vdata_stream_sender.h"
#include "vec/sink/vmysql_table_writer.h"
+#include "vec/sink/vresult_file_sink.h"
#include "vec/sink/vtablet_sink.h"
#include "vec/sink/vmysql_table_sink.h"
@@ -91,13 +92,35 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
if (!thrift_sink.__isset.result_file_sink) {
return Status::InternalError("Missing result file sink.");
}
- // Result file sink is not the top sink
- if (params.__isset.destinations && params.destinations.size() > 0) {
- tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
- params.destinations, pool, params.sender_id, desc_tbl);
+
+ // TODO: figure out good buffer size based on size of output row
+ if (is_vec) {
+ bool send_query_statistics_with_every_batch =
+ params.__isset.send_query_statistics_with_every_batch
+ ? params.send_query_statistics_with_every_batch
+ : false;
+ // Result file sink is not the top sink
+ if (params.__isset.destinations && params.destinations.size() > 0) {
+ tmp_sink = new doris::vectorized::VResultFileSink(
+ pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
+ params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
+ output_exprs, desc_tbl);
+ } else {
+ tmp_sink = new doris::vectorized::VResultFileSink(
+ pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+ send_query_statistics_with_every_batch, output_exprs);
+ }
} else {
- tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+ // Result file sink is not the top sink
+ if (params.__isset.destinations && params.destinations.size() > 0) {
+ tmp_sink =
+ new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
+ params.destinations, pool, params.sender_id, desc_tbl);
+ } else {
+ tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+ }
}
+
sink->reset(tmp_sink);
break;
}
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2d30b33f50..afc95e77a9 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -173,9 +173,11 @@ set(VEC_FILES
sink/vtablet_sink.cpp
sink/vmysql_table_writer.cpp
sink/vmysql_table_sink.cpp
+ sink/vresult_file_sink.cpp
runtime/vdatetime_value.cpp
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
+ runtime/vfile_result_writer.cpp
runtime/vpartition_info.cpp
runtime/vsorted_run_merger.cpp)
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
new file mode 100644
index 0000000000..8d70baa854
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "exec/file_writer.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+ const TStorageBackendType::type storage_type,
+ const TUniqueId fragment_instance_id,
+ const std::vector<ExprContext*>& output_expr_ctxs,
+ RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+ Block* output_block, bool output_object_data,
+ const RowDescriptor& output_row_descriptor)
+ : _file_opts(file_opts),
+ _storage_type(storage_type),
+ _fragment_instance_id(fragment_instance_id),
+ _output_expr_ctxs(output_expr_ctxs),
+ _parent_profile(parent_profile),
+ _sinker(sinker),
+ _output_block(output_block),
+ _output_row_descriptor(output_row_descriptor) {
+ _output_object_data = output_object_data;
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+ _state = state;
+ _init_profile();
+ return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+ RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+ _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+ _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+ _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+ _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+ _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+ _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+ std::string file_name;
+ RETURN_IF_ERROR(_get_success_file_name(&file_name));
+ RETURN_IF_ERROR(_create_file_writer(file_name));
+ return _close_file_writer(true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+ std::stringstream ss;
+ ss << _file_opts->file_path << _file_opts->success_file_name;
+ *file_name = ss.str();
+ if (_storage_type == TStorageBackendType::LOCAL) {
+ // For local file writer, the file_path is a local dir.
+ // Here we do a simple security verification by checking whether the file exists.
+ // Because the file path is currently arbitrarily specified by the user,
+ // Doris is not responsible for ensuring the correctness of the path.
+ // This is just to prevent overwriting the existing file.
+ if (FileUtils::check_exist(*file_name)) {
+ return Status::InternalError("File already exists: " + *file_name +
+ ". Host: " + BackendOptions::get_localhost());
+ }
+ }
+
+ return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+ std::string file_name;
+ RETURN_IF_ERROR(_get_next_file_name(&file_name));
+ return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+ if (_storage_type == TStorageBackendType::LOCAL) {
+ _file_writer_impl.reset(new LocalFileWriter(file_name, 0 /* start offset */));
+ } else if (_storage_type == TStorageBackendType::BROKER) {
+ _file_writer_impl.reset(new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0 /*start offset*/));
+ } else if (_storage_type == TStorageBackendType::S3) {
+ _file_writer_impl.reset(new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */));
+ } else if (_storage_type == TStorageBackendType::HDFS) {
+ FileWriter* tmp_writer = nullptr;
+ RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+ const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
+ file_name, &tmp_writer));
+ _file_writer_impl.reset(tmp_writer);
+ }
+
+ RETURN_IF_ERROR(_file_writer_impl->open());
+ switch (_file_opts->file_format) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ // just use file writer is enough
+ break;
+ case TFileFormatType::FORMAT_PARQUET:
+ return Status::NotSupported("Parquet Writer is not supported yet!");
+ break;
+ default:
+ return Status::InternalError(
+ strings::Substitute("unsupported file format: $0", _file_opts->file_format));
+ }
+ LOG(INFO) << "create file for exporting query result. file name: " << file_name
+ << ". query id: " << print_id(_state->query_id())
+ << " format:" << _file_opts->file_format;
+ return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+ std::stringstream ss;
+ ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
+ << _file_format_to_name();
+ *file_name = ss.str();
+ _header_sent = false;
+ if (_storage_type == TStorageBackendType::LOCAL) {
+ // For local file writer, the file_path is a local dir.
+ // Here we do a simple security verification by checking whether the file exists.
+ // Because the file path is currently arbitrarily specified by the user,
+ // Doris is not responsible for ensuring the correctness of the path.
+ // This is just to prevent overwriting the existing file.
+ if (FileUtils::check_exist(*file_name)) {
+ return Status::InternalError("File already exists: " + *file_name +
+ ". Host: " + BackendOptions::get_localhost());
+ }
+ }
+
+ return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+ std::stringstream ss;
+ if (_storage_type == TStorageBackendType::LOCAL) {
+ ss << "file:///" << BackendOptions::get_localhost();
+ }
+ ss << _file_opts->file_path;
+ ss << print_id(_fragment_instance_id) << "_";
+ *file_url = ss.str();
+ return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+ switch (_file_opts->file_format) {
+ case TFileFormatType::FORMAT_CSV_PLAIN:
+ return "csv";
+ case TFileFormatType::FORMAT_PARQUET:
+ return "parquet";
+ default:
+ return "unknown";
+ }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+ if (block.rows() == 0) {
+ return Status::OK();
+ }
+ SCOPED_TIMER(_append_row_batch_timer);
+ if (_parquet_writer != nullptr) {
+ return Status::NotSupported("Parquet Writer is not supported yet!");
+ } else {
+ RETURN_IF_ERROR(_write_csv_file(block));
+ }
+
+ _written_rows += block.rows();
+ return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+ for (size_t i = 0; i < block.rows(); i++) {
+ for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+ auto col = block.get_by_position(col_id);
+ if (col.column->is_null_at(i)) {
+ _plain_text_outstream << NULL_IN_CSV;
+ } else {
+ switch (_output_expr_ctxs[col_id]->root()->type().type) {
+ case TYPE_BOOLEAN:
+ case TYPE_TINYINT:
+ _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
+ col.column->get_data_at(i).data);
+ break;
+ case TYPE_SMALLINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_INT:
+ _plain_text_outstream
+ << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_BIGINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_LARGEINT:
+ _plain_text_outstream
+ << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data);
+ break;
+ case TYPE_FLOAT: {
+ char buffer[MAX_FLOAT_STR_LENGTH + 2];
+ float float_value =
+ *reinterpret_cast<const float*>(col.column->get_data_at(i).data);
+ buffer[0] = '\0';
+ int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
+ DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
+ _plain_text_outstream << buffer;
+ break;
+ }
+ case TYPE_DOUBLE: {
+ // To prevent loss of precision on float and double types,
+ // they are converted to strings before output.
+ // For example: For a double value 27361919854.929001,
+ // the direct output of using std::stringstream is 2.73619e+10,
+ // and after conversion to a string, it outputs 27361919854.929001
+ char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+ double double_value =
+ *reinterpret_cast<const double*>(col.column->get_data_at(i).data);
+ buffer[0] = '\0';
+ int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
+ DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
+ _plain_text_outstream << buffer;
+ break;
+ }
+ case TYPE_DATE:
+ case TYPE_DATETIME: {
+ char buf[64];
+ const VecDateTimeValue* time_val =
+ (const VecDateTimeValue*)(col.column->get_data_at(i).data);
+ time_val->to_string(buf);
+ _plain_text_outstream << buf;
+ break;
+ }
+ case TYPE_OBJECT:
+ case TYPE_HLL:
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ auto value = col.column->get_data_at(i);
+ _plain_text_outstream << value;
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ const DecimalV2Value decimal_val(
+ reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
+ ->value);
+ std::string decimal_str;
+ int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
+ decimal_str = decimal_val.to_string(output_scale);
+ _plain_text_outstream << decimal_str;
+ break;
+ }
+ default: {
+ // not supported type, like BITMAP, HLL, just export null
+ _plain_text_outstream << NULL_IN_CSV;
+ }
+ }
+ }
+ if (col_id < block.columns() - 1) {
+ _plain_text_outstream << _file_opts->column_separator;
+ }
+ }
+ _plain_text_outstream << _file_opts->line_delimiter;
+ }
+
+ return _flush_plain_text_outstream(true);
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+ SCOPED_TIMER(_file_write_timer);
+ size_t pos = _plain_text_outstream.tellp();
+ if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+ return Status::OK();
+ }
+
+ const std::string& buf = _plain_text_outstream.str();
+ size_t written_len = 0;
+ RETURN_IF_ERROR(_file_writer_impl->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
+ buf.size(), &written_len));
+ COUNTER_UPDATE(_written_data_bytes, written_len);
+ _current_written_bytes += written_len;
+
+ // clear the stream
+ _plain_text_outstream.str("");
+ _plain_text_outstream.clear();
+
+ // split file if exceed limit
+ return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+ if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+ return Status::OK();
+ }
+ // current file size exceed the max file size. close this file
+ // and create new one
+ {
+ SCOPED_TIMER(_writer_close_timer);
+ RETURN_IF_ERROR(_close_file_writer(false));
+ }
+ _current_written_bytes = 0;
+ return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done) {
+ if (_parquet_writer != nullptr) {
+ return Status::NotSupported("Parquet Writer is not supported yet!");
+ } else if (_file_writer_impl) {
+ _file_writer_impl->close();
+ }
+
+ if (!done) {
+ // not finished, create new file writer for next file
+ RETURN_IF_ERROR(_create_next_file_writer());
+ } else {
+ // All data is written to file, send statistic result
+ if (_file_opts->success_file_name != "") {
+ // write success file, just need to touch an empty file
+ RETURN_IF_ERROR(_create_success_file());
+ }
+ if (_output_block == nullptr) {
+ RETURN_IF_ERROR(_send_result());
+ } else {
+ RETURN_IF_ERROR(_fill_result_block());
+ }
+ }
+ return Status::OK();
+}
+
+Status VFileResultWriter::_send_result() {
+ if (_is_result_sent) {
+ return Status::OK();
+ }
+ _is_result_sent = true;
+
+ // The final stat result include:
+ // FileNumber, TotalRows, FileSize and URL
+ // The type of these field should be conssitent with types defined
+ // in OutFileClause.java of FE.
+ MysqlRowBuffer row_buffer;
+ row_buffer.push_int(_file_idx); // file number
+ row_buffer.push_bigint(_written_rows_counter->value()); // total rows
+ row_buffer.push_bigint(_written_data_bytes->value()); // file size
+ std::string file_url;
+ _get_file_url(&file_url);
+ row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+
+ std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
+ result->result_batch.rows.resize(1);
+ result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
+ RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result");
+ return Status::OK();
+}
+
+Status VFileResultWriter::_fill_result_block() {
+ if (_is_result_sent) {
+ return Status::OK();
+ }
+ _is_result_sent = true;
+
+#ifndef INSERT_TO_COLUMN
+#define INSERT_TO_COLUMN \
+ if (i == 0) { \
+ column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0); \
+ } else if (i == 1) { \
+ int64_t written_rows = _written_rows_counter->value(); \
+ column->insert_data(reinterpret_cast<const char*>(&written_rows), 0); \
+ } else if (i == 2) { \
+ int64_t written_data_bytes = _written_data_bytes->value(); \
+ column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \
+ } else if (i == 3) { \
+ std::string file_url; \
+ _get_file_url(&file_url); \
+ column->insert_data(file_url.c_str(), file_url.size()); \
+ } \
+ _output_block->replace_by_position(i, std::move(column));
+#endif
+
+ for (int i = 0; i < _output_block->columns(); i++) {
+ switch (_output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type) {
+ case TYPE_INT: {
+ auto column = ColumnVector<int32_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ case TYPE_BIGINT: {
+ auto column = ColumnVector<int64_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ case TYPE_LARGEINT: {
+ auto column = ColumnVector<int128_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ case TYPE_SMALLINT: {
+ auto column = ColumnVector<int16_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ case TYPE_TINYINT: {
+ auto column = ColumnVector<int8_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ auto column = ColumnVector<int8_t>::create();
+ INSERT_TO_COLUMN;
+ break;
+ }
+ default:
+ return Status::InternalError(strings::Substitute(
+ "Invalid type to print: $0",
+ _output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type));
+ }
+ }
+ return Status::OK();
+}
+
+Status VFileResultWriter::close() {
+ // the following 2 profile "_written_rows_counter" and "_writer_close_timer"
+ // must be outside the `_close_file_writer()`.
+ // because `_close_file_writer()` may be called in deconstructor,
+ // at that time, the RuntimeState may already been deconstructed,
+ // so does the profile in RuntimeState.
+ COUNTER_SET(_written_rows_counter, _written_rows);
+ SCOPED_TIMER(_writer_close_timer);
+ return _close_file_writer(true);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h
new file mode 100644
index 0000000000..b7fd2cd737
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/file_writer.h"
+#include "runtime/file_result_writer.h"
+#include "vec/sink/result_sink.h"
+
+namespace doris {
+
+namespace vectorized {
+// write result to file
+class VFileResultWriter final : public VResultWriter {
+public:
+ VFileResultWriter(const ResultFileOptions* file_option,
+ const TStorageBackendType::type storage_type,
+ const TUniqueId fragment_instance_id,
+ const std::vector<ExprContext*>& output_expr_ctxs,
+ RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+ Block* output_block, bool output_object_data,
+ const RowDescriptor& output_row_descriptor);
+ virtual ~VFileResultWriter() = default;
+
+ virtual Status append_block(Block& block) override;
+ virtual Status append_row_batch(const RowBatch* batch) override {
+ return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
+ };
+
+ virtual Status init(RuntimeState* state) override;
+ virtual Status close() override;
+
+ // file result writer always return statistic result in one row
+ virtual int64_t get_written_rows() const override { return 1; }
+
+private:
+ Status _write_csv_file(const Block& block);
+
+ // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
+ // if eos, write the data even if buffer is not full.
+ Status _flush_plain_text_outstream(bool eos);
+ void _init_profile();
+
+ Status _create_file_writer(const std::string& file_name);
+ Status _create_next_file_writer();
+ Status _create_success_file();
+ // get next export file name
+ Status _get_next_file_name(std::string* file_name);
+ Status _get_success_file_name(std::string* file_name);
+ Status _get_file_url(std::string* file_url);
+ std::string _file_format_to_name();
+ // close file writer, and if !done, it will create new writer for next file.
+ // if only_close is true, this method will just close the file writer and return.
+ Status _close_file_writer(bool done);
+ // create a new file if current file size exceed limit
+ Status _create_new_file_if_exceed_size();
+ // send the final statistic result
+ Status _send_result();
+ // save result into batch rather than send it
+ Status _fill_result_block();
+
+ RuntimeState* _state; // not owned, set when init
+ const ResultFileOptions* _file_opts;
+ TStorageBackendType::type _storage_type;
+ TUniqueId _fragment_instance_id;
+ const std::vector<ExprContext*>& _output_expr_ctxs;
+
+ // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
+ // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
+ std::unique_ptr<FileWriter> _file_writer_impl;
+ // parquet file writer
+ ParquetWriterWrapper* _parquet_writer = nullptr;
+ // Used to buffer the export data of plain text
+ // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
+ // file writer's write() for every single row.
+ // But this cannot solve the problem of a row of data that is too large.
+ // For example: bitmap_to_string() may return large volumn of data.
+ // And the speed is relative low, in my test, is about 6.5MB/s.
+ std::stringstream _plain_text_outstream;
+ static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
+
+ // current written bytes, used for split data
+ int64_t _current_written_bytes = 0;
+ // the suffix idx of export file name, start at 0
+ int _file_idx = 0;
+
+ RuntimeProfile* _parent_profile; // profile from result sink, not owned
+ // total time cost on append batch operation
+ RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
+ // tuple convert timer, child timer of _append_row_batch_timer
+ RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
+ // file write timer, child timer of _append_row_batch_timer
+ RuntimeProfile::Counter* _file_write_timer = nullptr;
+ // time of closing the file writer
+ RuntimeProfile::Counter* _writer_close_timer = nullptr;
+ // number of written rows
+ RuntimeProfile::Counter* _written_rows_counter = nullptr;
+ // bytes of written data
+ RuntimeProfile::Counter* _written_data_bytes = nullptr;
+
+ // _sinker and _output_batch are not owned by FileResultWriter
+ BufferControlBlock* _sinker = nullptr;
+ Block* _output_block = nullptr;
+ // set to true if the final statistic result is sent
+ bool _is_result_sent = false;
+ bool _header_sent = false;
+ RowDescriptor _output_row_descriptor;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 249f70ac0f..cbadfc931f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -290,6 +290,41 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_name = "VDataStreamSender";
}
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch)
+ : _sender_id(sender_id),
+ _pool(pool),
+ _row_desc(row_desc),
+ _current_channel_idx(0),
+ _ignore_not_found(true),
+ _cur_pb_block(&_pb_block1),
+ _profile(nullptr),
+ _serialize_batch_timer(nullptr),
+ _bytes_sent_counter(nullptr),
+ _local_bytes_send_counter(nullptr),
+ _dest_node_id(0) {
+ _name = "VDataStreamSender";
+}
+
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc,
+ int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch)
+ : _sender_id(0),
+ _pool(pool),
+ _row_desc(row_desc),
+ _current_channel_idx(0),
+ _ignore_not_found(true),
+ _cur_pb_block(&_pb_block1),
+ _profile(nullptr),
+ _serialize_batch_timer(nullptr),
+ _bytes_sent_counter(nullptr),
+ _local_bytes_send_counter(nullptr),
+ _dest_node_id(0) {
+ _name = "VDataStreamSender";
+}
+
VDataStreamSender::~VDataStreamSender() {
_channel_shared_ptrs.clear();
}
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index d62ebeb684..67faae452b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -45,13 +45,20 @@ namespace vectorized {
class VExprContext;
class VPartitionInfo;
-class VDataStreamSender final : public DataSink {
+class VDataStreamSender : public DataSink {
public:
VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
const TDataStreamSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
+ VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
+
+ VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch);
+
~VDataStreamSender();
virtual Status init(const TDataSink& thrift_sink) override;
@@ -69,10 +76,8 @@ public:
Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
-private:
+protected:
void _roll_pb_block();
-
-private:
class Channel;
Status get_partition_column_result(Block* block, int* result) const {
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
new file mode 100644
index 0000000000..6d8d994585
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vresult_file_sink.h"
+
+#include "common/config.h"
+#include "exprs/expr.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/exec_env.h"
+#include "runtime/file_result_writer.h"
+#include "runtime/result_buffer_mgr.h"
+#include "runtime/result_file_sink.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/runtime/vfile_result_writer.h"
+
+namespace doris::vectorized {
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc,
+ const TResultFileSink& sink, int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch,
+ const std::vector<TExpr>& t_output_expr)
+ : VDataStreamSender(pool, row_desc, per_channel_buffer_size,
+ send_query_statistics_with_every_batch),
+ _t_output_expr(t_output_expr) {
+ CHECK(sink.__isset.file_options);
+ _file_opts.reset(new ResultFileOptions(sink.file_options));
+ CHECK(sink.__isset.storage_backend_type);
+ _storage_type = sink.storage_backend_type;
+ _is_top_sink = true;
+
+ _name = "VResultFileSink";
+}
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+ const TResultFileSink& sink,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch,
+ const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
+ : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size,
+ send_query_statistics_with_every_batch),
+ _t_output_expr(t_output_expr),
+ _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
+ CHECK(sink.__isset.file_options);
+ _file_opts.reset(new ResultFileOptions(sink.file_options));
+ CHECK(sink.__isset.storage_backend_type);
+ _storage_type = sink.storage_backend_type;
+ _is_top_sink = false;
+ DCHECK_EQ(destinations.size(), 1);
+ _channel_shared_ptrs.emplace_back(new Channel(
+ this, _output_row_descriptor, destinations[0].brpc_server,
+ destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true));
+ _channels.push_back(_channel_shared_ptrs.back().get());
+
+ _name = "VResultFileSink";
+}
+
+Status VResultFileSink::init(const TDataSink& tsink) {
+ return Status::OK();
+}
+
+Status VResultFileSink::prepare_exprs(RuntimeState* state) {
+ // From the thrift expressions create the real exprs.
+ RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+ // Prepare the exprs to run.
+ RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+ return Status::OK();
+}
+
+Status VResultFileSink::prepare(RuntimeState* state) {
+ RETURN_IF_ERROR(DataSink::prepare(state));
+ std::stringstream title;
+ title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id())
+ << ")";
+ // create profile
+ _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+ // prepare output_expr
+ RETURN_IF_ERROR(prepare_exprs(state));
+
+ CHECK(_file_opts.get() != nullptr);
+ if (_is_top_sink) {
+ // create sender
+ RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+ state->fragment_instance_id(), _buf_size, &_sender));
+ // create writer
+ _writer.reset(new (std::nothrow) VFileResultWriter(
+ _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+ _profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
+ _output_row_descriptor));
+ } else {
+ // init channel
+ _profile = _pool->add(new RuntimeProfile(title.str()));
+ _state = state;
+ _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+ _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+ _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
+ _uncompressed_bytes_counter =
+ ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+ // create writer
+ _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
+ _writer.reset(new (std::nothrow) VFileResultWriter(
+ _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+ _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
+ _output_row_descriptor));
+ }
+ RETURN_IF_ERROR(_writer->init(state));
+ for (int i = 0; i < _channels.size(); ++i) {
+ RETURN_IF_ERROR(_channels[i]->init(state));
+ }
+ return Status::OK();
+}
+
+Status VResultFileSink::open(RuntimeState* state) {
+ return Expr::open(_output_expr_ctxs, state);
+}
+
+Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
+ return Status::NotSupported("Not Implemented VResultFileSink Node::get_next scalar");
+}
+
+Status VResultFileSink::send(RuntimeState* state, Block* block) {
+ RETURN_IF_ERROR(_writer->append_block(*block));
+ return Status::OK();
+}
+
+Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+ if (_closed) {
+ return Status::OK();
+ }
+
+ Status final_status = exec_status;
+ // close the writer
+ if (_writer) {
+ Status st = _writer->close();
+ if (!st.ok() && exec_status.ok()) {
+ // close file writer failed, should return this error to client
+ final_status = st;
+ }
+ }
+ if (_is_top_sink) {
+ // close sender, this is normal path end
+ if (_sender) {
+ _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
+ _sender->close(final_status);
+ }
+ state->exec_env()->result_mgr()->cancel_at_time(
+ time(nullptr) + config::result_buffer_cancelled_interval_time,
+ state->fragment_instance_id());
+ } else {
+ if (final_status.ok()) {
+ RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size()));
+ for (auto channel : _channels) {
+ RETURN_IF_ERROR(channel->send_block(_cur_pb_block));
+ }
+ }
+ Status final_st = Status::OK();
+ for (int i = 0; i < _channels.size(); ++i) {
+ Status st = _channels[i]->close(state);
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
+ }
+ // wait all channels to finish
+ for (int i = 0; i < _channels.size(); ++i) {
+ Status st = _channels[i]->close_wait(state);
+ if (!st.ok() && final_st.ok()) {
+ final_st = st;
+ }
+ }
+ _output_block->clear();
+ }
+
+ Expr::close(_output_expr_ctxs, state);
+
+ _closed = true;
+ return Status::OK();
+}
+
+void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
+ if (_is_top_sink) {
+ _sender->set_query_statistics(statistics);
+ } else {
+ _query_statistics = statistics;
+ }
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h
new file mode 100644
index 0000000000..e924883b42
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/result_file_sink.h"
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+namespace vectorized {
+class VResultWriter;
+
+class VResultFileSink : public VDataStreamSender {
+public:
+ VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
+ int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+ const std::vector<TExpr>& t_output_expr);
+ VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+ const TResultFileSink& sink,
+ const std::vector<TPlanFragmentDestination>& destinations,
+ int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+ const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
+ virtual ~VResultFileSink() = default;
+ virtual Status init(const TDataSink& thrift_sink) override;
+ virtual Status prepare(RuntimeState* state) override;
+ virtual Status open(RuntimeState* state) override;
+ // send data in 'batch' to this backend stream mgr
+ // Blocks until all rows in batch are placed in the buffer
+ virtual Status send(RuntimeState* state, RowBatch* batch) override;
+ virtual Status send(RuntimeState* state, Block* block) override;
+ // Flush all buffered data and close all existing channels to destination
+ // hosts. Further send() calls are illegal after calling close().
+ virtual Status close(RuntimeState* state, Status exec_status) override;
+ virtual RuntimeProfile* profile() override { return _profile; }
+
+ void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
+
+private:
+ Status prepare_exprs(RuntimeState* state);
+ // set file options when sink type is FILE
+ std::unique_ptr<ResultFileOptions> _file_opts;
+ TStorageBackendType::type _storage_type;
+
+ // Owned by the RuntimeState.
+ const std::vector<TExpr>& _t_output_expr;
+ std::vector<ExprContext*> _output_expr_ctxs;
+ RowDescriptor _output_row_descriptor;
+
+ std::unique_ptr<Block> _output_block = nullptr;
+ std::shared_ptr<BufferControlBlock> _sender;
+ std::shared_ptr<VResultWriter> _writer;
+ int _buf_size = 1024; // Allocated from _pool
+ bool _is_top_sink = true;
+};
+} // namespace vectorized
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org