You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2022/06/08 07:36:38 UTC

[GitHub] [incubator-doris] Gabriel39 opened a new pull request, #10013: [Feature] Support outfile on vectorized engine

Gabriel39 opened a new pull request, #10013:
URL: https://github.com/apache/incubator-doris/pull/10013

   # Proposed changes
   
   Issue Number: close #xxx
   
   ## Problem Summary:
   
   Describe the overview of changes.
   
   ## Checklist(Required)
   
   1. Does it affect the original behavior: (Yes/No/I Don't know)
   2. Has unit tests been added: (Yes/No/No Need)
   3. Has document been added or modified: (Yes/No/No Need)
   4. Does it need to update dependencies: (Yes/No)
   5. Are there any changes that cannot be rolled back: (Yes/No)
   
   ## Further comments
   
   If this is a relatively large or complex change, kick off the discussion at [dev@doris.apache.org](mailto:dev@doris.apache.org) by explaining why you chose the solution you did and what alternatives you considered, etc...
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#issuecomment-1151024187

   PR approved by anyone and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893218281


##########
be/src/vec/sink/vresult_file_sink.h:
##########
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/result_file_sink.h"
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+namespace vectorized {
+class VResultWriter;
+
+class VResultFileSink : public VDataStreamSender {
+public:
+    VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr);
+    VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                    const TResultFileSink& sink,
+                    const std::vector<TPlanFragmentDestination>& destinations,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
+    virtual ~VResultFileSink();
+    virtual Status init(const TDataSink& thrift_sink) override;
+    virtual Status prepare(RuntimeState* state) override;
+    virtual Status open(RuntimeState* state) override;
+    // send data in 'batch' to this backend stream mgr
+    // Blocks until all rows in batch are placed in the buffer
+    virtual Status send(RuntimeState* state, RowBatch* batch) override;
+    virtual Status send(RuntimeState* state, Block* block) override;
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    virtual Status close(RuntimeState* state, Status exec_status) override;
+    virtual RuntimeProfile* profile() override { return _profile; }
+
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
+
+private:
+    Status prepare_exprs(RuntimeState* state);
+    // set file options when sink type is FILE
+    std::unique_ptr<ResultFileOptions> _file_opts;
+    TStorageBackendType::type _storage_type;
+
+    // Owned by the RuntimeState.
+    const std::vector<TExpr>& _t_output_expr;
+    std::vector<ExprContext*> _output_expr_ctxs;
+    RowDescriptor _output_row_descriptor;
+
+    Block* _output_block = nullptr;

Review Comment:
   use unique ptr please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] github-actions[bot] commented on pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#issuecomment-1151024167

   PR approved by at least one committer and no changes requested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman merged pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
morningman merged PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893223618


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
+    } else if (_storage_type == TStorageBackendType::BROKER) {
+        _file_writer =
+                new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
+                                 _file_opts->broker_properties, file_name, 0 /*start offset*/);
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
+                file_name, &_file_writer));
+    }
+    RETURN_IF_ERROR(_file_writer->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", _file_opts->file_format));
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << file_name
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(write_csv_header());
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+
+    _written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 2.73619e+10,
+                    // and after conversion to a string, it outputs 27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+
+    return _flush_plain_text_outstream(true);
+}
+
+std::string VFileResultWriter::gen_types() {
+    std::string types = "";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_expr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _file_opts->column_separator;
+        }
+    }
+    types += _file_opts->line_delimiter;
+    return types;
+}
+
+Status VFileResultWriter::write_csv_header() {
+    if (!_header_sent && _header.size() > 0) {
+        std::string tmp_header = _header;
+        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            tmp_header += gen_types();
+        }
+        size_t written_len = 0;
+        RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(tmp_header.c_str()),
+                                            tmp_header.size(), &written_len));
+        _header_sent = true;
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()), buf.size(),
+                                        &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done, bool only_close) {

Review Comment:
   maybe we should split this method a some little method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893038957


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);

Review Comment:
   happenlee add a file factory, could use the factory to simplify the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893220089


##########
be/src/vec/sink/vresult_file_sink.cpp:
##########
@@ -0,0 +1,216 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vresult_file_sink.h"
+
+#include "common/config.h"
+#include "exprs/expr.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/exec_env.h"
+#include "runtime/file_result_writer.h"
+#include "runtime/result_buffer_mgr.h"
+#include "runtime/result_file_sink.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/runtime/vfile_result_writer.h"
+
+namespace doris::vectorized {
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink, int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr)
+        : VDataStreamSender(pool, row_desc, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = true;
+
+    _name = "VResultFileSink";
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+}
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink,
+                                 const std::vector<TPlanFragmentDestination>& destinations,
+                                 int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
+        : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr),
+          _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = false;
+    DCHECK_EQ(destinations.size(), 1);
+    _channel_shared_ptrs.emplace_back(new Channel(
+            this, _output_row_descriptor, destinations[0].brpc_server,
+            destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true));
+    _channels.push_back(_channel_shared_ptrs.back().get());
+
+    _name = "VResultFileSink";
+    //for impl csv_with_name and csv_with_names_and_types
+    _header_type = sink.header_type;
+    _header = sink.header;
+}
+
+VResultFileSink::~VResultFileSink() {
+    if (_output_block != nullptr) {
+        delete _output_block;
+    }
+}
+
+Status VResultFileSink::init(const TDataSink& tsink) {
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare_exprs(RuntimeState* state) {
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    std::stringstream title;
+    title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id())
+          << ")";
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+    // prepare output_expr
+    RETURN_IF_ERROR(prepare_exprs(state));
+
+    CHECK(_file_opts.get() != nullptr);
+    if (_is_top_sink) {
+        // create sender
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->fragment_instance_id(), _buf_size, &_sender));
+        // create writer
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
+                _output_row_descriptor));
+    } else {
+        // init channel
+        _profile = _pool->add(new RuntimeProfile(title.str()));
+        _state = state;
+        _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+        _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+        _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
+        _uncompressed_bytes_counter =
+                ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+        // create writer
+        _output_block = new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1);
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, nullptr, _output_block, state->return_object_data_as_binary(),

Review Comment:
   Use unique ptr for _output_block and call get() to pass the raw pointer to writer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893226387


##########
be/src/vec/runtime/vfile_result_writer.h:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/file_result_writer.h"
+#include "vec/sink/result_sink.h"
+
+namespace doris {
+
+namespace vectorized {
+// write result to file
+class VFileResultWriter final : public VResultWriter {
+public:
+    VFileResultWriter(const ResultFileOptions* file_option,
+                      const TStorageBackendType::type storage_type,
+                      const TUniqueId fragment_instance_id,
+                      const std::vector<ExprContext*>& output_expr_ctxs,
+                      RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                      Block* output_block, bool output_object_data,
+                      const RowDescriptor& output_row_descriptor);
+    virtual ~VFileResultWriter();
+
+    virtual Status append_block(Block& block) override;
+    virtual Status append_row_batch(const RowBatch* batch) override {
+        return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
+    };
+
+    virtual Status init(RuntimeState* state) override;
+    virtual Status close() override;
+
+    // file result writer always return statistic result in one row
+    virtual int64_t get_written_rows() const override { return 1; }
+
+    std::string gen_types();
+    Status write_csv_header();
+
+private:
+    Status _write_csv_file(const Block& block);
+
+    // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
+    // if eos, write the data even if buffer is not full.
+    Status _flush_plain_text_outstream(bool eos);
+    void _init_profile();
+
+    Status _create_file_writer(const std::string& file_name);
+    Status _create_next_file_writer();
+    Status _create_success_file();
+    // get next export file name
+    Status _get_next_file_name(std::string* file_name);
+    Status _get_success_file_name(std::string* file_name);
+    Status _get_file_url(std::string* file_url);
+    std::string _file_format_to_name();
+    // close file writer, and if !done, it will create new writer for next file.
+    // if only_close is true, this method will just close the file writer and return.
+    Status _close_file_writer(bool done, bool only_close = false);
+    // create a new file if current file size exceed limit
+    Status _create_new_file_if_exceed_size();
+    // send the final statistic result
+    Status _send_result();
+    // save result into batch rather than send it
+    Status _fill_result_block();
+
+    RuntimeState* _state; // not owned, set when init
+    const ResultFileOptions* _file_opts;
+    TStorageBackendType::type _storage_type;
+    TUniqueId _fragment_instance_id;
+    const std::vector<ExprContext*>& _output_expr_ctxs;
+
+    // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
+    // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
+    FileWriter* _file_writer = nullptr;

Review Comment:
   Maybe use _file_output_stream because file writer is very similar to file result writer, very confused.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893203210


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {

Review Comment:
   Do not need a separate function here. Merge it with _create_success_file();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893201778


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);

Review Comment:
   close_file_writer maybe a very heavy work,  should not call it in a deconstructor.  Maybe we can check if it is closed here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893207944


##########
be/src/vec/runtime/vfile_result_writer.cpp:
##########
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "common/consts.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/raw_value.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "runtime/tuple_row.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/common/arena.h"
+#include "vec/core/block.h"
+#include "vec/core/types.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+VFileResultWriter::~VFileResultWriter() {
+    _close_file_writer(true);
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true, true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
+    } else if (_storage_type == TStorageBackendType::BROKER) {
+        _file_writer =
+                new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
+                                 _file_opts->broker_properties, file_name, 0 /*start offset*/);
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer = new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
+                file_name, &_file_writer));
+    }
+    RETURN_IF_ERROR(_file_writer->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", _file_opts->file_format));
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << file_name
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(write_csv_header());
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+
+    _written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 2.73619e+10,
+                    // and after conversion to a string, it outputs 27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+
+    return _flush_plain_text_outstream(true);
+}
+
+std::string VFileResultWriter::gen_types() {
+    std::string types = "";
+    int num_columns = _output_expr_ctxs.size();
+    for (int i = 0; i < num_columns; ++i) {
+        types += type_to_string(_output_expr_ctxs[i]->root()->type().type);
+        if (i < num_columns - 1) {
+            types += _file_opts->column_separator;
+        }
+    }
+    types += _file_opts->line_delimiter;
+    return types;
+}
+
+Status VFileResultWriter::write_csv_header() {
+    if (!_header_sent && _header.size() > 0) {
+        std::string tmp_header = _header;
+        if (_header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
+            tmp_header += gen_types();
+        }
+        size_t written_len = 0;
+        RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(tmp_header.c_str()),
+                                            tmp_header.size(), &written_len));
+        _header_sent = true;
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer->write(reinterpret_cast<const uint8_t*>(buf.c_str()), buf.size(),
+                                        &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done, bool only_close) {

Review Comment:
   please add a comment here to explain what's the meaning of done and only_close



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] yiguolei commented on a diff in pull request #10013: [Feature] Support outfile on vectorized engine

Posted by GitBox <gi...@apache.org>.
yiguolei commented on code in PR #10013:
URL: https://github.com/apache/incubator-doris/pull/10013#discussion_r893208553


##########
be/src/vec/runtime/vfile_result_writer.h:
##########
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/file_result_writer.h"
+#include "vec/sink/result_sink.h"
+
+namespace doris {
+
+namespace vectorized {
+// write result to file
+class VFileResultWriter final : public VResultWriter {
+public:
+    VFileResultWriter(const ResultFileOptions* file_option,
+                      const TStorageBackendType::type storage_type,
+                      const TUniqueId fragment_instance_id,
+                      const std::vector<ExprContext*>& output_expr_ctxs,
+                      RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                      Block* output_block, bool output_object_data,
+                      const RowDescriptor& output_row_descriptor);
+    virtual ~VFileResultWriter();
+
+    virtual Status append_block(Block& block) override;
+    virtual Status append_row_batch(const RowBatch* batch) override {
+        return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
+    };
+
+    virtual Status init(RuntimeState* state) override;
+    virtual Status close() override;
+
+    // file result writer always return statistic result in one row
+    virtual int64_t get_written_rows() const override { return 1; }
+
+    std::string gen_types();
+    Status write_csv_header();
+
+private:
+    Status _write_csv_file(const Block& block);
+
+    // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
+    // if eos, write the data even if buffer is not full.
+    Status _flush_plain_text_outstream(bool eos);
+    void _init_profile();
+
+    Status _create_file_writer(const std::string& file_name);
+    Status _create_next_file_writer();
+    Status _create_success_file();
+    // get next export file name
+    Status _get_next_file_name(std::string* file_name);
+    Status _get_success_file_name(std::string* file_name);
+    Status _get_file_url(std::string* file_url);
+    std::string _file_format_to_name();
+    // close file writer, and if !done, it will create new writer for next file.
+    // if only_close is true, this method will just close the file writer and return.
+    Status _close_file_writer(bool done, bool only_close = false);
+    // create a new file if current file size exceed limit
+    Status _create_new_file_if_exceed_size();
+    // send the final statistic result
+    Status _send_result();
+    // save result into batch rather than send it
+    Status _fill_result_block();
+
+    RuntimeState* _state; // not owned, set when init
+    const ResultFileOptions* _file_opts;
+    TStorageBackendType::type _storage_type;
+    TUniqueId _fragment_instance_id;
+    const std::vector<ExprContext*>& _output_expr_ctxs;
+
+    // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
+    // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
+    FileWriter* _file_writer = nullptr;

Review Comment:
   Use unique ptr please.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org