You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/10 01:15:59 UTC

[incubator-doris] branch master updated: [feature](vectorized) Support outfile on vectorized engine (#10013)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1220cc147d [feature](vectorized) Support outfile on vectorized engine (#10013)
1220cc147d is described below

commit 1220cc147dc7a51ada5523ae84c4f20ce7610098
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Jun 10 09:15:53 2022 +0800

    [feature](vectorized) Support outfile on vectorized engine (#10013)
    
    This PR supports output csv format file on vectorized engine.
    
    ** Parquet is still not supported. **
---
 be/src/exec/data_sink.cpp                  |  33 +-
 be/src/vec/CMakeLists.txt                  |   2 +
 be/src/vec/runtime/vfile_result_writer.cpp | 485 +++++++++++++++++++++++++++++
 be/src/vec/runtime/vfile_result_writer.h   | 127 ++++++++
 be/src/vec/sink/vdata_stream_sender.cpp    |  35 +++
 be/src/vec/sink/vdata_stream_sender.h      |  13 +-
 be/src/vec/sink/vresult_file_sink.cpp      | 210 +++++++++++++
 be/src/vec/sink/vresult_file_sink.h        |  72 +++++
 8 files changed, 968 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index f6c1d21aa7..ef93a0c354 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -41,6 +41,7 @@
 #include "vec/sink/vdata_stream_sender.h"
 #include "vec/sink/vmysql_table_sink.h"
 #include "vec/sink/vmysql_table_writer.h"
+#include "vec/sink/vresult_file_sink.h"
 #include "vec/sink/vtablet_sink.h"
 
 namespace doris {
@@ -94,13 +95,35 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         if (!thrift_sink.__isset.result_file_sink) {
             return Status::InternalError("Missing result file sink.");
         }
-        // Result file sink is not the top sink
-        if (params.__isset.destinations && params.destinations.size() > 0) {
-            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
-                                          params.destinations, pool, params.sender_id, desc_tbl);
+
+        // TODO: figure out good buffer size based on size of output row
+        if (is_vec) {
+            bool send_query_statistics_with_every_batch =
+                    params.__isset.send_query_statistics_with_every_batch
+                            ? params.send_query_statistics_with_every_batch
+                            : false;
+            // Result file sink is not the top sink
+            if (params.__isset.destinations && params.destinations.size() > 0) {
+                tmp_sink = new doris::vectorized::VResultFileSink(
+                        pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
+                        params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
+                        output_exprs, desc_tbl);
+            } else {
+                tmp_sink = new doris::vectorized::VResultFileSink(
+                        pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+                        send_query_statistics_with_every_batch, output_exprs);
+            }
         } else {
-            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+            // Result file sink is not the top sink
+            if (params.__isset.destinations && params.destinations.size() > 0) {
+                tmp_sink =
+                        new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
+                                           params.destinations, pool, params.sender_id, desc_tbl);
+            } else {
+                tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+            }
         }
+
         sink->reset(tmp_sink);
         break;
     }
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index d42b12e5bb..7862172c2c 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -191,9 +191,11 @@ set(VEC_FILES
   sink/vtablet_sink.cpp
   sink/vmysql_table_writer.cpp
   sink/vmysql_table_sink.cpp
+  sink/vresult_file_sink.cpp
   runtime/vdatetime_value.cpp
   runtime/vdata_stream_recvr.cpp
   runtime/vdata_stream_mgr.cpp
+  runtime/vfile_result_writer.cpp
   runtime/vpartition_info.cpp
   utils/arrow_column_to_doris_column.cpp
   runtime/vsorted_run_merger.cpp)
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
new file mode 100644
index 0000000000..f5aa53f10d
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -0,0 +1,485 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "io/file_factory.h"
+#include "io/file_writer.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    RETURN_IF_ERROR(FileFactory::create_file_writer(
+            FileFactory::convert_storage_type(_storage_type), _state->exec_env(),
+            _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0,
+            _file_writer_impl));
+    RETURN_IF_ERROR(_file_writer_impl->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", _file_opts->file_format));
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << file_name
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(write_csv_header());
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+
+    _written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 2.73619e+10,
+                    // and after conversion to a string, it outputs 27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+
+    return _flush_plain_text_outstream(true);
+}
+
+std::string VFileResultWriter::gen_types() {
+    std::string types = "";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_expr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _file_opts->column_separator;
+        }
+    }
+    types += _file_opts->line_delimiter;
+    return types;
+}
+
+Status VFileResultWriter::write_csv_header() {
+    if (!_header_sent && _header.size() > 0) {
+        std::string tmp_header = _header;
+        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            tmp_header += gen_types();
+        }
+        size_t written_len = 0;
+        RETURN_IF_ERROR(
+                _file_writer_impl->write(reinterpret_cast<const uint8_t*>(tmp_header.c_str()),
+                                         tmp_header.size(), &written_len));
+        _header_sent = true;
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer_impl->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
+                                             buf.size(), &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done) {
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else if (_file_writer_impl) {
+        _file_writer_impl->close();
+    }
+
+    if (!done) {
+        // not finished, create new file writer for next file
+        RETURN_IF_ERROR(_create_next_file_writer());
+    } else {
+        // All data is written to file, send statistic result
+        if (_file_opts->success_file_name != "") {
+            // write success file, just need to touch an empty file
+            RETURN_IF_ERROR(_create_success_file());
+        }
+        if (_output_block == nullptr) {
+            RETURN_IF_ERROR(_send_result());
+        } else {
+            RETURN_IF_ERROR(_fill_result_block());
+        }
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_send_result() {
+    if (_is_result_sent) {
+        return Status::OK();
+    }
+    _is_result_sent = true;
+
+    // The final stat result include:
+    // FileNumber, TotalRows, FileSize and URL
+    // The type of these field should be conssitent with types defined
+    // in OutFileClause.java of FE.
+    MysqlRowBuffer row_buffer;
+    row_buffer.push_int(_file_idx);                         // file number
+    row_buffer.push_bigint(_written_rows_counter->value()); // total rows
+    row_buffer.push_bigint(_written_data_bytes->value());   // file size
+    std::string file_url;
+    _get_file_url(&file_url);
+    row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+
+    std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
+    result->result_batch.rows.resize(1);
+    result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
+    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result");
+    return Status::OK();
+}
+
+Status VFileResultWriter::_fill_result_block() {
+    if (_is_result_sent) {
+        return Status::OK();
+    }
+    _is_result_sent = true;
+
+#ifndef INSERT_TO_COLUMN
+#define INSERT_TO_COLUMN                                                            \
+    if (i == 0) {                                                                   \
+        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);          \
+    } else if (i == 1) {                                                            \
+        int64_t written_rows = _written_rows_counter->value();                      \
+        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);       \
+    } else if (i == 2) {                                                            \
+        int64_t written_data_bytes = _written_data_bytes->value();                  \
+        column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \
+    } else if (i == 3) {                                                            \
+        std::string file_url;                                                       \
+        _get_file_url(&file_url);                                                   \
+        column->insert_data(file_url.c_str(), file_url.size());                     \
+    }                                                                               \
+    _output_block->replace_by_position(i, std::move(column));
+#endif
+
+    for (int i = 0; i < _output_block->columns(); i++) {
+        switch (_output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type) {
+        case TYPE_INT: {
+            auto column = ColumnVector<int32_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_BIGINT: {
+            auto column = ColumnVector<int64_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_LARGEINT: {
+            auto column = ColumnVector<int128_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_SMALLINT: {
+            auto column = ColumnVector<int16_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_TINYINT: {
+            auto column = ColumnVector<int8_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_CHAR:
+        case TYPE_STRING: {
+            auto column = ColumnVector<int8_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        default:
+            return Status::InternalError(strings::Substitute(
+                    "Invalid type to print: $0",
+                    _output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type));
+        }
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::close() {
+    // the following 2 profile "_written_rows_counter" and "_writer_close_timer"
+    // must be outside the `_close_file_writer()`.
+    // because `_close_file_writer()` may be called in deconstructor,
+    // at that time, the RuntimeState may already been deconstructed,
+    // so does the profile in RuntimeState.
+    COUNTER_SET(_written_rows_counter, _written_rows);
+    SCOPED_TIMER(_writer_close_timer);
+    return _close_file_writer(true);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h
new file mode 100644
index 0000000000..abfac874c1
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -0,0 +1,127 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/file_writer.h"
+#include "runtime/file_result_writer.h"
+#include "vec/sink/result_sink.h"
+
+namespace doris {
+
+namespace vectorized {
+// write result to file
+class VFileResultWriter final : public VResultWriter {
+public:
+    VFileResultWriter(const ResultFileOptions* file_option,
+                      const TStorageBackendType::type storage_type,
+                      const TUniqueId fragment_instance_id,
+                      const std::vector<ExprContext*>& output_expr_ctxs,
+                      RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                      Block* output_block, bool output_object_data,
+                      const RowDescriptor& output_row_descriptor);
+    virtual ~VFileResultWriter() = default;
+
+    virtual Status append_block(Block& block) override;
+    virtual Status append_row_batch(const RowBatch* batch) override {
+        return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
+    };
+
+    virtual Status init(RuntimeState* state) override;
+    virtual Status close() override;
+
+    // file result writer always return statistic result in one row
+    virtual int64_t get_written_rows() const override { return 1; }
+
+    std::string gen_types();
+    Status write_csv_header();
+
+private:
+    Status _write_csv_file(const Block& block);
+
+    // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
+    // if eos, write the data even if buffer is not full.
+    Status _flush_plain_text_outstream(bool eos);
+    void _init_profile();
+
+    Status _create_file_writer(const std::string& file_name);
+    Status _create_next_file_writer();
+    Status _create_success_file();
+    // get next export file name
+    Status _get_next_file_name(std::string* file_name);
+    Status _get_success_file_name(std::string* file_name);
+    Status _get_file_url(std::string* file_url);
+    std::string _file_format_to_name();
+    // close file writer, and if !done, it will create new writer for next file.
+    // if only_close is true, this method will just close the file writer and return.
+    Status _close_file_writer(bool done);
+    // create a new file if current file size exceed limit
+    Status _create_new_file_if_exceed_size();
+    // send the final statistic result
+    Status _send_result();
+    // save result into batch rather than send it
+    Status _fill_result_block();
+
+    RuntimeState* _state; // not owned, set when init
+    const ResultFileOptions* _file_opts;
+    TStorageBackendType::type _storage_type;
+    TUniqueId _fragment_instance_id;
+    const std::vector<ExprContext*>& _output_expr_ctxs;
+
+    // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
+    // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
+    std::unique_ptr<FileWriter> _file_writer_impl;
+    // parquet file writer
+    ParquetWriterWrapper* _parquet_writer = nullptr;
+    // Used to buffer the export data of plain text
+    // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
+    // file writer's write() for every single row.
+    // But this cannot solve the problem of a row of data that is too large.
+    // For example: bitmap_to_string() may return large volumn of data.
+    // And the speed is relative low, in my test, is about 6.5MB/s.
+    std::stringstream _plain_text_outstream;
+    static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
+
+    // current written bytes, used for split data
+    int64_t _current_written_bytes = 0;
+    // the suffix idx of export file name, start at 0
+    int _file_idx = 0;
+
+    RuntimeProfile* _parent_profile; // profile from result sink, not owned
+    // total time cost on append batch operation
+    RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
+    // tuple convert timer, child timer of _append_row_batch_timer
+    RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
+    // file write timer, child timer of _append_row_batch_timer
+    RuntimeProfile::Counter* _file_write_timer = nullptr;
+    // time of closing the file writer
+    RuntimeProfile::Counter* _writer_close_timer = nullptr;
+    // number of written rows
+    RuntimeProfile::Counter* _written_rows_counter = nullptr;
+    // bytes of written data
+    RuntimeProfile::Counter* _written_data_bytes = nullptr;
+
+    // _sinker and _output_batch are not owned by FileResultWriter
+    BufferControlBlock* _sinker = nullptr;
+    Block* _output_block = nullptr;
+    // set to true if the final statistic result is sent
+    bool _is_result_sent = false;
+    bool _header_sent = false;
+    RowDescriptor _output_row_descriptor;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index dfe698e2a9..5153327644 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -290,6 +290,41 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
     _name = "VDataStreamSender";
 }
 
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                                     const std::vector<TPlanFragmentDestination>& destinations,
+                                     int per_channel_buffer_size,
+                                     bool send_query_statistics_with_every_batch)
+        : _sender_id(sender_id),
+          _pool(pool),
+          _row_desc(row_desc),
+          _current_channel_idx(0),
+          _ignore_not_found(true),
+          _cur_pb_block(&_pb_block1),
+          _profile(nullptr),
+          _serialize_batch_timer(nullptr),
+          _bytes_sent_counter(nullptr),
+          _local_bytes_send_counter(nullptr),
+          _dest_node_id(0) {
+    _name = "VDataStreamSender";
+}
+
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc,
+                                     int per_channel_buffer_size,
+                                     bool send_query_statistics_with_every_batch)
+        : _sender_id(0),
+          _pool(pool),
+          _row_desc(row_desc),
+          _current_channel_idx(0),
+          _ignore_not_found(true),
+          _cur_pb_block(&_pb_block1),
+          _profile(nullptr),
+          _serialize_batch_timer(nullptr),
+          _bytes_sent_counter(nullptr),
+          _local_bytes_send_counter(nullptr),
+          _dest_node_id(0) {
+    _name = "VDataStreamSender";
+}
+
 VDataStreamSender::~VDataStreamSender() {
     _channel_shared_ptrs.clear();
 }
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 155ed53277..5a8d5d25cd 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -45,13 +45,20 @@ namespace vectorized {
 class VExprContext;
 class VPartitionInfo;
 
-class VDataStreamSender final : public DataSink {
+class VDataStreamSender : public DataSink {
 public:
     VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                       const TDataStreamSink& sink,
                       const std::vector<TPlanFragmentDestination>& destinations,
                       int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
 
+    VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                      const std::vector<TPlanFragmentDestination>& destinations,
+                      int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
+
+    VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size,
+                      bool send_query_statistics_with_every_batch);
+
     ~VDataStreamSender();
 
     virtual Status init(const TDataSink& thrift_sink) override;
@@ -69,10 +76,8 @@ public:
 
     Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
 
-private:
+protected:
     void _roll_pb_block();
-
-private:
     class Channel;
 
     Status get_partition_column_result(Block* block, int* result) const {
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
new file mode 100644
index 0000000000..b69226bd7e
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vresult_file_sink.h"
+
+#include "common/config.h"
+#include "exprs/expr.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/exec_env.h"
+#include "runtime/file_result_writer.h"
+#include "runtime/result_buffer_mgr.h"
+#include "runtime/result_file_sink.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/runtime/vfile_result_writer.h"
+
+namespace doris::vectorized {
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink, int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr)
+        : VDataStreamSender(pool, row_desc, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = true;
+
+    _name = "VResultFileSink";
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+}
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink,
+                                 const std::vector<TPlanFragmentDestination>& destinations,
+                                 int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
+        : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr),
+          _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = false;
+    DCHECK_EQ(destinations.size(), 1);
+    _channel_shared_ptrs.emplace_back(new Channel(
+            this, _output_row_descriptor, destinations[0].brpc_server,
+            destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true));
+    _channels.push_back(_channel_shared_ptrs.back().get());
+
+    _name = "VResultFileSink";
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+}
+
+Status VResultFileSink::init(const TDataSink& tsink) {
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare_exprs(RuntimeState* state) {
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    std::stringstream title;
+    title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id())
+          << ")";
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+    // prepare output_expr
+    RETURN_IF_ERROR(prepare_exprs(state));
+
+    CHECK(_file_opts.get() != nullptr);
+    if (_is_top_sink) {
+        // create sender
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->fragment_instance_id(), _buf_size, &_sender));
+        // create writer
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
+                _output_row_descriptor));
+    } else {
+        // init channel
+        _profile = _pool->add(new RuntimeProfile(title.str()));
+        _state = state;
+        _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+        _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+        _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
+        _uncompressed_bytes_counter =
+                ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+        // create writer
+        _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
+                _output_row_descriptor));
+    }
+    _writer->set_header_info(_header_type, _header);
+    RETURN_IF_ERROR(_writer->init(state));
+    for (int i = 0; i < _channels.size(); ++i) {
+        RETURN_IF_ERROR(_channels[i]->init(state));
+    }
+    return Status::OK();
+}
+
+Status VResultFileSink::open(RuntimeState* state) {
+    return Expr::open(_output_expr_ctxs, state);
+}
+
+Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
+    return Status::NotSupported("Not Implemented VResultFileSink Node::get_next scalar");
+}
+
+Status VResultFileSink::send(RuntimeState* state, Block* block) {
+    RETURN_IF_ERROR(_writer->append_block(*block));
+    return Status::OK();
+}
+
+Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    Status final_status = exec_status;
+    // close the writer
+    if (_writer) {
+        Status st = _writer->close();
+        if (!st.ok() && exec_status.ok()) {
+            // close file writer failed, should return this error to client
+            final_status = st;
+        }
+    }
+    if (_is_top_sink) {
+        // close sender, this is normal path end
+        if (_sender) {
+            _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
+            _sender->close(final_status);
+        }
+        state->exec_env()->result_mgr()->cancel_at_time(
+                time(nullptr) + config::result_buffer_cancelled_interval_time,
+                state->fragment_instance_id());
+    } else {
+        if (final_status.ok()) {
+            RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size()));
+            for (auto channel : _channels) {
+                RETURN_IF_ERROR(channel->send_block(_cur_pb_block));
+            }
+        }
+        Status final_st = Status::OK();
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        // wait all channels to finish
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close_wait(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        _output_block->clear();
+    }
+
+    Expr::close(_output_expr_ctxs, state);
+
+    _closed = true;
+    return Status::OK();
+}
+
+void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
+    if (_is_top_sink) {
+        _sender->set_query_statistics(statistics);
+    } else {
+        _query_statistics = statistics;
+    }
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h
new file mode 100644
index 0000000000..89085a71e3
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/result_file_sink.h"
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+namespace vectorized {
+class VResultWriter;
+
+class VResultFileSink : public VDataStreamSender {
+public:
+    VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr);
+    VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                    const TResultFileSink& sink,
+                    const std::vector<TPlanFragmentDestination>& destinations,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
+    virtual ~VResultFileSink() = default;
+    virtual Status init(const TDataSink& thrift_sink) override;
+    virtual Status prepare(RuntimeState* state) override;
+    virtual Status open(RuntimeState* state) override;
+    // send data in 'batch' to this backend stream mgr
+    // Blocks until all rows in batch are placed in the buffer
+    virtual Status send(RuntimeState* state, RowBatch* batch) override;
+    virtual Status send(RuntimeState* state, Block* block) override;
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    virtual Status close(RuntimeState* state, Status exec_status) override;
+    virtual RuntimeProfile* profile() override { return _profile; }
+
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
+
+private:
+    Status prepare_exprs(RuntimeState* state);
+    // set file options when sink type is FILE
+    std::unique_ptr<ResultFileOptions> _file_opts;
+    TStorageBackendType::type _storage_type;
+
+    // Owned by the RuntimeState.
+    const std::vector<TExpr>& _t_output_expr;
+    std::vector<ExprContext*> _output_expr_ctxs;
+    RowDescriptor _output_row_descriptor;
+
+    std::unique_ptr<Block> _output_block = nullptr;
+    std::shared_ptr<BufferControlBlock> _sender;
+    std::shared_ptr<VResultWriter> _writer;
+    int _buf_size = 1024; // Allocated from _pool
+    bool _is_top_sink = true;
+    std::string _header;
+    std::string _header_type;
+};
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org