You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/11 15:17:40 UTC

[incubator-doris] branch dev-1.0.1 updated (5190828319 -> b8ba96e73e)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from 5190828319 [Bug] [Vectorized] code dump on aggregate node over union node (#10040)
     new 8cdccfb01a [feature](vectorized) Support outfile on vectorized engine (#10013)
     new b8ba96e73e [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/exec/data_sink.cpp                          |  33 +-
 be/src/vec/CMakeLists.txt                          |   2 +
 be/src/vec/runtime/vfile_result_writer.cpp         | 468 +++++++++++++++++++++
 .../runtime/vfile_result_writer.h}                 | 102 ++---
 be/src/vec/sink/vdata_stream_sender.cpp            |  35 ++
 be/src/vec/sink/vdata_stream_sender.h              |  13 +-
 .../sink/vresult_file_sink.cpp}                    |  89 ++--
 .../sink/vresult_file_sink.h}                      |  49 +--
 .../main/java/org/apache/doris/qe/Coordinator.java |  16 +-
 ...ut => test_bucket_join_with_colocate_table.out} |   8 +-
 .../test_bucket_join_with_colocate_table.groovy    |  80 ++++
 11 files changed, 730 insertions(+), 165 deletions(-)
 create mode 100644 be/src/vec/runtime/vfile_result_writer.cpp
 copy be/src/{runtime/file_result_writer.h => vec/runtime/vfile_result_writer.h} (59%)
 copy be/src/{runtime/result_file_sink.cpp => vec/sink/vresult_file_sink.cpp} (66%)
 copy be/src/{runtime/result_file_sink.h => vec/sink/vresult_file_sink.h} (65%)
 copy regression-test/data/correctness/{test_select_constant.out => test_bucket_join_with_colocate_table.out} (66%)
 create mode 100644 regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/02: [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit b8ba96e73e070706c0874018a9248f130928b75d
Author: morrySnow <10...@users.noreply.github.com>
AuthorDate: Sat Jun 11 21:44:47 2022 +0800

    [fix](planner) produce wrong result when use bucket shuffle join with colocate left table (#10045)
    
    When plan bucket shuffle join, we need to know left table bucket number.
    Currently, we use tablet number directly based on the assumption that left table has only one partition.
    But, when left table is colocated table, it could have more than one partition.
    In this case, some data in right table will be dropped incorrectly and produce wrong result for query.
    
    reproduce could follow regression test in PR.
---
 .../main/java/org/apache/doris/qe/Coordinator.java | 16 ++++-
 .../test_bucket_join_with_colocate_table.out       |  7 ++
 .../test_bucket_join_with_colocate_table.groovy    | 80 ++++++++++++++++++++++
 3 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 86225e38b5..75dc09b9f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1850,9 +1850,19 @@ public class Coordinator {
         private void computeScanRangeAssignmentByBucket(
                 final OlapScanNode scanNode, ImmutableMap<Long, Backend> idToBackend, Map<TNetworkAddress, Long> addressToBackendID) throws Exception {
             if (!fragmentIdToSeqToAddressMap.containsKey(scanNode.getFragmentId())) {
-                // The bucket shuffle join only hit when the partition is one. so the totalTabletsNum is all tablet of
-                // one hit partition. can be the right bucket num in bucket shuffle join
-                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), (int)scanNode.getTotalTabletsNum());
+                // In bucket shuffle join, we have 2 situation.
+                // 1. Only one partition: in this case, we use scanNode.getTotalTabletsNum() to get the right bucket num
+                //    because when table turn on dynamic partition, the bucket number in default distribution info
+                //    is not correct.
+                // 2. Table is colocated: in this case, table could have more than one partition, but all partition's
+                //    bucket number must be same, so we use default bucket num is ok.
+                int bucketNum = 0;
+                if (scanNode.getOlapTable().isColocateTable()) {
+                    bucketNum = scanNode.getOlapTable().getDefaultDistributionInfo().getBucketNum();
+                } else {
+                    bucketNum = (int) (scanNode.getTotalTabletsNum());
+                }
+                fragmentIdToBucketNumMap.put(scanNode.getFragmentId(), bucketNum);
                 fragmentIdToSeqToAddressMap.put(scanNode.getFragmentId(), new HashedMap());
                 fragmentIdBucketSeqToScanRangeMap.put(scanNode.getFragmentId(), new BucketSeqToScanRange());
                 fragmentIdToBuckendIdBucketCountMap.put(scanNode.getFragmentId(), new HashMap<>());
diff --git a/regression-test/data/correctness/test_bucket_join_with_colocate_table.out b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
new file mode 100644
index 0000000000..0cf6530614
--- /dev/null
+++ b/regression-test/data/correctness/test_bucket_join_with_colocate_table.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !select --
+2	2	2	2	2
+1	1	1	1	1
+3	3	3	3	3
+\N	\N	\N	4	4
+
diff --git a/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
new file mode 100644
index 0000000000..4c8f0d0f18
--- /dev/null
+++ b/regression-test/suites/correctness/test_bucket_join_with_colocate_table.groovy
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+ // or more contributor license agreements.  See the NOTICE file
+ // distributed with this work for additional information
+ // regarding copyright ownership.  The ASF licenses this file
+ // to you under the Apache License, Version 2.0 (the
+ // "License"); you may not use this file except in compliance
+ // with the License.  You may obtain a copy of the License at
+ //
+ //   http://www.apache.org/licenses/LICENSE-2.0
+ //
+ // Unless required by applicable law or agreed to in writing,
+ // software distributed under the License is distributed on an
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ // KIND, either express or implied.  See the License for the
+ // specific language governing permissions and limitations
+ // under the License.
+
+ suite("test_bucket_join_with_colocate_table") {
+     def colocateTableName = "colocate_table"
+     def rightTable = "right_table"
+
+
+     sql """ DROP TABLE IF EXISTS ${colocateTableName} """
+     sql """ DROP TABLE IF EXISTS ${rightTable} """
+     sql """
+         CREATE TABLE `${colocateTableName}` (
+           `c1` int(11) NULL COMMENT "",
+           `c2` int(11) NULL COMMENT "",
+           `c3` int(11) NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`c1`, `c2`, `c3`)
+         COMMENT "OLAP"
+         PARTITION BY RANGE(`c2`)
+         (PARTITION p1 VALUES [("-2147483648"), ("2")),
+         PARTITION p2 VALUES [("2"), (MAXVALUE)))
+         DISTRIBUTED BY HASH(`c1`) BUCKETS 8
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "colocate_with" = "group1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+     sql """
+         CREATE TABLE `${rightTable}` (
+           `k1` int(11) NOT NULL COMMENT "",
+           `v1` int(11) NOT NULL COMMENT ""
+         ) ENGINE=OLAP
+         DUPLICATE KEY(`k1`, `v1`)
+         COMMENT "OLAP"
+         DISTRIBUTED BY HASH(`k1`) BUCKETS 10
+         PROPERTIES (
+           "replication_allocation" = "tag.location.default: 1",
+           "in_memory" = "false",
+           "storage_format" = "V2"
+         )
+     """
+
+     sql """ INSERT INTO ${colocateTableName} VALUES
+         (0, 0, 0),
+         (1, 1, 1),
+         (2, 2, 2),
+         (3, 3, 3)
+         ;
+     """
+
+     sql """ INSERT INTO ${rightTable} VALUES
+         (1, 1),
+         (2, 2),
+         (3, 3),
+         (4, 4)
+         ;
+     """
+
+     // test_vectorized
+     sql """ set enable_vectorized_engine = true; """
+
+     qt_select """  select * from ${colocateTableName} right outer join ${rightTable} on ${colocateTableName}.c1 = ${rightTable}.k1; """
+ }
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/02: [feature](vectorized) Support outfile on vectorized engine (#10013)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 8cdccfb01a5ddd12d8ecef6f4870e8bb6235257d
Author: Gabriel <ga...@gmail.com>
AuthorDate: Fri Jun 10 09:15:53 2022 +0800

    [feature](vectorized) Support outfile on vectorized engine (#10013)
    
    This PR supports output csv format file on vectorized engine.
    
    ** Parquet is still not supported. **
---
 be/src/exec/data_sink.cpp                  |  33 +-
 be/src/vec/CMakeLists.txt                  |   2 +
 be/src/vec/runtime/vfile_result_writer.cpp | 468 +++++++++++++++++++++++++++++
 be/src/vec/runtime/vfile_result_writer.h   | 124 ++++++++
 be/src/vec/sink/vdata_stream_sender.cpp    |  35 +++
 be/src/vec/sink/vdata_stream_sender.h      |  13 +-
 be/src/vec/sink/vresult_file_sink.cpp      | 203 +++++++++++++
 be/src/vec/sink/vresult_file_sink.h        |  70 +++++
 8 files changed, 939 insertions(+), 9 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index a0ef6336e5..a11556310a 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -38,6 +38,7 @@
 #include "vec/sink/result_sink.h"
 #include "vec/sink/vdata_stream_sender.h"
 #include "vec/sink/vmysql_table_writer.h"
+#include "vec/sink/vresult_file_sink.h"
 #include "vec/sink/vtablet_sink.h"
 #include "vec/sink/vmysql_table_sink.h"
 
@@ -91,13 +92,35 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         if (!thrift_sink.__isset.result_file_sink) {
             return Status::InternalError("Missing result file sink.");
         }
-        // Result file sink is not the top sink
-        if (params.__isset.destinations && params.destinations.size() > 0) {
-            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
-                                          params.destinations, pool, params.sender_id, desc_tbl);
+
+        // TODO: figure out good buffer size based on size of output row
+        if (is_vec) {
+            bool send_query_statistics_with_every_batch =
+                    params.__isset.send_query_statistics_with_every_batch
+                            ? params.send_query_statistics_with_every_batch
+                            : false;
+            // Result file sink is not the top sink
+            if (params.__isset.destinations && params.destinations.size() > 0) {
+                tmp_sink = new doris::vectorized::VResultFileSink(
+                        pool, params.sender_id, row_desc, thrift_sink.result_file_sink,
+                        params.destinations, 16 * 1024, send_query_statistics_with_every_batch,
+                        output_exprs, desc_tbl);
+            } else {
+                tmp_sink = new doris::vectorized::VResultFileSink(
+                        pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+                        send_query_statistics_with_every_batch, output_exprs);
+            }
         } else {
-            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+            // Result file sink is not the top sink
+            if (params.__isset.destinations && params.destinations.size() > 0) {
+                tmp_sink =
+                        new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
+                                           params.destinations, pool, params.sender_id, desc_tbl);
+            } else {
+                tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+            }
         }
+
         sink->reset(tmp_sink);
         break;
     }
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 2d30b33f50..afc95e77a9 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -173,9 +173,11 @@ set(VEC_FILES
   sink/vtablet_sink.cpp
   sink/vmysql_table_writer.cpp
   sink/vmysql_table_sink.cpp
+  sink/vresult_file_sink.cpp
   runtime/vdatetime_value.cpp
   runtime/vdata_stream_recvr.cpp
   runtime/vdata_stream_mgr.cpp
+  runtime/vfile_result_writer.cpp
   runtime/vpartition_info.cpp
   runtime/vsorted_run_merger.cpp)
 
diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
new file mode 100644
index 0000000000..8d70baa854
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -0,0 +1,468 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/runtime/vfile_result_writer.h"
+
+#include "exprs/expr_context.h"
+#include "gutil/strings/numbers.h"
+#include "gutil/strings/substitute.h"
+#include "exec/file_writer.h"
+#include "exec/broker_writer.h"
+#include "exec/hdfs_reader_writer.h"
+#include "exec/local_file_writer.h"
+#include "exec/s3_writer.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/descriptors.h"
+#include "runtime/large_int_value.h"
+#include "runtime/runtime_state.h"
+#include "runtime/string_value.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/mysql_global.h"
+#include "util/mysql_row_buffer.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+const size_t VFileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
+using doris::operator<<;
+
+VFileResultWriter::VFileResultWriter(const ResultFileOptions* file_opts,
+                                     const TStorageBackendType::type storage_type,
+                                     const TUniqueId fragment_instance_id,
+                                     const std::vector<ExprContext*>& output_expr_ctxs,
+                                     RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                     Block* output_block, bool output_object_data,
+                                     const RowDescriptor& output_row_descriptor)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_block(output_block),
+          _output_row_descriptor(output_row_descriptor) {
+    _output_object_data = output_object_data;
+}
+
+Status VFileResultWriter::init(RuntimeState* state) {
+    _state = state;
+    _init_profile();
+    return _create_next_file_writer();
+}
+
+void VFileResultWriter::_init_profile() {
+    RuntimeProfile* profile = _parent_profile->create_child("VFileResultWriter", true, true);
+    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
+    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
+    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
+    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
+    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
+    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
+}
+
+Status VFileResultWriter::_create_success_file() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_success_file_name(&file_name));
+    RETURN_IF_ERROR(_create_file_writer(file_name));
+    return _close_file_writer(true);
+}
+
+Status VFileResultWriter::_get_success_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << _file_opts->success_file_name;
+    *file_name = ss.str();
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+Status VFileResultWriter::_create_next_file_writer() {
+    std::string file_name;
+    RETURN_IF_ERROR(_get_next_file_name(&file_name));
+    return _create_file_writer(file_name);
+}
+
+Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        _file_writer_impl.reset(new LocalFileWriter(file_name, 0 /* start offset */));
+    } else if (_storage_type == TStorageBackendType::BROKER) {
+        _file_writer_impl.reset(new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses, _file_opts->broker_properties, file_name, 0 /*start offset*/));
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer_impl.reset(new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */));
+    } else if (_storage_type == TStorageBackendType::HDFS) {
+        FileWriter* tmp_writer = nullptr;
+        RETURN_IF_ERROR(HdfsReaderWriter::create_writer(
+                const_cast<std::map<std::string, std::string>&>(_file_opts->broker_properties),
+                file_name, &tmp_writer));
+        _file_writer_impl.reset(tmp_writer);
+    }
+    
+    RETURN_IF_ERROR(_file_writer_impl->open());
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        // just use file writer is enough
+        break;
+    case TFileFormatType::FORMAT_PARQUET:
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+        break;
+    default:
+        return Status::InternalError(
+                strings::Substitute("unsupported file format: $0", _file_opts->file_format));
+    }
+    LOG(INFO) << "create file for exporting query result. file name: " << file_name
+              << ". query id: " << print_id(_state->query_id())
+              << " format:" << _file_opts->file_format;
+    return Status::OK();
+}
+
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
+Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
+    std::stringstream ss;
+    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
+       << _file_format_to_name();
+    *file_name = ss.str();
+    _header_sent = false;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        // For local file writer, the file_path is a local dir.
+        // Here we do a simple security verification by checking whether the file exists.
+        // Because the file path is currently arbitrarily specified by the user,
+        // Doris is not responsible for ensuring the correctness of the path.
+        // This is just to prevent overwriting the existing file.
+        if (FileUtils::check_exist(*file_name)) {
+            return Status::InternalError("File already exists: " + *file_name +
+                                         ". Host: " + BackendOptions::get_localhost());
+        }
+    }
+
+    return Status::OK();
+}
+
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status VFileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
+std::string VFileResultWriter::_file_format_to_name() {
+    switch (_file_opts->file_format) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+        return "csv";
+    case TFileFormatType::FORMAT_PARQUET:
+        return "parquet";
+    default:
+        return "unknown";
+    }
+}
+
+Status VFileResultWriter::append_block(Block& block) {
+    if (block.rows() == 0) {
+        return Status::OK();
+    }
+    SCOPED_TIMER(_append_row_batch_timer);
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else {
+        RETURN_IF_ERROR(_write_csv_file(block));
+    }
+
+    _written_rows += block.rows();
+    return Status::OK();
+}
+
+Status VFileResultWriter::_write_csv_file(const Block& block) {
+    for (size_t i = 0; i < block.rows(); i++) {
+        for (size_t col_id = 0; col_id < block.columns(); col_id++) {
+            auto col = block.get_by_position(col_id);
+            if (col.column->is_null_at(i)) {
+                _plain_text_outstream << NULL_IN_CSV;
+            } else {
+                switch (_output_expr_ctxs[col_id]->root()->type().type) {
+                case TYPE_BOOLEAN:
+                case TYPE_TINYINT:
+                    _plain_text_outstream << (int)*reinterpret_cast<const int8_t*>(
+                            col.column->get_data_at(i).data);
+                    break;
+                case TYPE_SMALLINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int16_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_INT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int32_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_BIGINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const int64_t*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_LARGEINT:
+                    _plain_text_outstream
+                            << *reinterpret_cast<const __int128*>(col.column->get_data_at(i).data);
+                    break;
+                case TYPE_FLOAT: {
+                    char buffer[MAX_FLOAT_STR_LENGTH + 2];
+                    float float_value =
+                            *reinterpret_cast<const float*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = FloatToBuffer(float_value, MAX_FLOAT_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt float failed, float value=" << float_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DOUBLE: {
+                    // To prevent loss of precision on float and double types,
+                    // they are converted to strings before output.
+                    // For example: For a double value 27361919854.929001,
+                    // the direct output of using std::stringstream is 2.73619e+10,
+                    // and after conversion to a string, it outputs 27361919854.929001
+                    char buffer[MAX_DOUBLE_STR_LENGTH + 2];
+                    double double_value =
+                            *reinterpret_cast<const double*>(col.column->get_data_at(i).data);
+                    buffer[0] = '\0';
+                    int length = DoubleToBuffer(double_value, MAX_DOUBLE_STR_LENGTH, buffer);
+                    DCHECK(length >= 0) << "gcvt double failed, double value=" << double_value;
+                    _plain_text_outstream << buffer;
+                    break;
+                }
+                case TYPE_DATE:
+                case TYPE_DATETIME: {
+                    char buf[64];
+                    const VecDateTimeValue* time_val =
+                            (const VecDateTimeValue*)(col.column->get_data_at(i).data);
+                    time_val->to_string(buf);
+                    _plain_text_outstream << buf;
+                    break;
+                }
+                case TYPE_OBJECT:
+                case TYPE_HLL:
+                case TYPE_VARCHAR:
+                case TYPE_CHAR:
+                case TYPE_STRING: {
+                    auto value = col.column->get_data_at(i);
+                    _plain_text_outstream << value;
+                    break;
+                }
+                case TYPE_DECIMALV2: {
+                    const DecimalV2Value decimal_val(
+                            reinterpret_cast<const PackedInt128*>(col.column->get_data_at(i).data)
+                                    ->value);
+                    std::string decimal_str;
+                    int output_scale = _output_expr_ctxs[col_id]->root()->output_scale();
+                    decimal_str = decimal_val.to_string(output_scale);
+                    _plain_text_outstream << decimal_str;
+                    break;
+                }
+                default: {
+                    // not supported type, like BITMAP, HLL, just export null
+                    _plain_text_outstream << NULL_IN_CSV;
+                }
+                }
+            }
+            if (col_id < block.columns() - 1) {
+                _plain_text_outstream << _file_opts->column_separator;
+            }
+        }
+        _plain_text_outstream << _file_opts->line_delimiter;
+    }
+
+    return _flush_plain_text_outstream(true);
+}
+
+Status VFileResultWriter::_flush_plain_text_outstream(bool eos) {
+    SCOPED_TIMER(_file_write_timer);
+    size_t pos = _plain_text_outstream.tellp();
+    if (pos == 0 || (pos < OUTSTREAM_BUFFER_SIZE_BYTES && !eos)) {
+        return Status::OK();
+    }
+
+    const std::string& buf = _plain_text_outstream.str();
+    size_t written_len = 0;
+    RETURN_IF_ERROR(_file_writer_impl->write(reinterpret_cast<const uint8_t*>(buf.c_str()),
+                                             buf.size(), &written_len));
+    COUNTER_UPDATE(_written_data_bytes, written_len);
+    _current_written_bytes += written_len;
+
+    // clear the stream
+    _plain_text_outstream.str("");
+    _plain_text_outstream.clear();
+
+    // split file if exceed limit
+    return _create_new_file_if_exceed_size();
+}
+
+Status VFileResultWriter::_create_new_file_if_exceed_size() {
+    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
+        return Status::OK();
+    }
+    // current file size exceed the max file size. close this file
+    // and create new one
+    {
+        SCOPED_TIMER(_writer_close_timer);
+        RETURN_IF_ERROR(_close_file_writer(false));
+    }
+    _current_written_bytes = 0;
+    return Status::OK();
+}
+
+Status VFileResultWriter::_close_file_writer(bool done) {
+    if (_parquet_writer != nullptr) {
+        return Status::NotSupported("Parquet Writer is not supported yet!");
+    } else if (_file_writer_impl) {
+        _file_writer_impl->close();
+    }
+
+    if (!done) {
+        // not finished, create new file writer for next file
+        RETURN_IF_ERROR(_create_next_file_writer());
+    } else {
+        // All data is written to file, send statistic result
+        if (_file_opts->success_file_name != "") {
+            // write success file, just need to touch an empty file
+            RETURN_IF_ERROR(_create_success_file());
+        }
+        if (_output_block == nullptr) {
+            RETURN_IF_ERROR(_send_result());
+        } else {
+            RETURN_IF_ERROR(_fill_result_block());
+        }
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::_send_result() {
+    if (_is_result_sent) {
+        return Status::OK();
+    }
+    _is_result_sent = true;
+
+    // The final stat result include:
+    // FileNumber, TotalRows, FileSize and URL
+    // The type of these field should be conssitent with types defined
+    // in OutFileClause.java of FE.
+    MysqlRowBuffer row_buffer;
+    row_buffer.push_int(_file_idx);                         // file number
+    row_buffer.push_bigint(_written_rows_counter->value()); // total rows
+    row_buffer.push_bigint(_written_data_bytes->value());   // file size
+    std::string file_url;
+    _get_file_url(&file_url);
+    row_buffer.push_string(file_url.c_str(), file_url.length()); // url
+
+    std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
+    result->result_batch.rows.resize(1);
+    result->result_batch.rows[0].assign(row_buffer.buf(), row_buffer.length());
+    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(result), "failed to send outfile result");
+    return Status::OK();
+}
+
+Status VFileResultWriter::_fill_result_block() {
+    if (_is_result_sent) {
+        return Status::OK();
+    }
+    _is_result_sent = true;
+
+#ifndef INSERT_TO_COLUMN
+#define INSERT_TO_COLUMN                                                            \
+    if (i == 0) {                                                                   \
+        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);          \
+    } else if (i == 1) {                                                            \
+        int64_t written_rows = _written_rows_counter->value();                      \
+        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);       \
+    } else if (i == 2) {                                                            \
+        int64_t written_data_bytes = _written_data_bytes->value();                  \
+        column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \
+    } else if (i == 3) {                                                            \
+        std::string file_url;                                                       \
+        _get_file_url(&file_url);                                                   \
+        column->insert_data(file_url.c_str(), file_url.size());                     \
+    }                                                                               \
+    _output_block->replace_by_position(i, std::move(column));
+#endif
+
+    for (int i = 0; i < _output_block->columns(); i++) {
+        switch (_output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type) {
+        case TYPE_INT: {
+            auto column = ColumnVector<int32_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_BIGINT: {
+            auto column = ColumnVector<int64_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_LARGEINT: {
+            auto column = ColumnVector<int128_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_SMALLINT: {
+            auto column = ColumnVector<int16_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_TINYINT: {
+            auto column = ColumnVector<int8_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        case TYPE_VARCHAR:
+        case TYPE_CHAR:
+        case TYPE_STRING: {
+            auto column = ColumnVector<int8_t>::create();
+            INSERT_TO_COLUMN;
+            break;
+        }
+        default:
+            return Status::InternalError(strings::Substitute(
+                    "Invalid type to print: $0",
+                    _output_row_descriptor.tuple_descriptors()[0]->slots()[i]->type().type));
+        }
+    }
+    return Status::OK();
+}
+
+Status VFileResultWriter::close() {
+    // the following 2 profile "_written_rows_counter" and "_writer_close_timer"
+    // must be outside the `_close_file_writer()`.
+    // because `_close_file_writer()` may be called in deconstructor,
+    // at that time, the RuntimeState may already been deconstructed,
+    // so does the profile in RuntimeState.
+    COUNTER_SET(_written_rows_counter, _written_rows);
+    SCOPED_TIMER(_writer_close_timer);
+    return _close_file_writer(true);
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vfile_result_writer.h b/be/src/vec/runtime/vfile_result_writer.h
new file mode 100644
index 0000000000..b7fd2cd737
--- /dev/null
+++ b/be/src/vec/runtime/vfile_result_writer.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "exec/file_writer.h"
+#include "runtime/file_result_writer.h"
+#include "vec/sink/result_sink.h"
+
+namespace doris {
+
+namespace vectorized {
+// write result to file
+class VFileResultWriter final : public VResultWriter {
+public:
+    VFileResultWriter(const ResultFileOptions* file_option,
+                      const TStorageBackendType::type storage_type,
+                      const TUniqueId fragment_instance_id,
+                      const std::vector<ExprContext*>& output_expr_ctxs,
+                      RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                      Block* output_block, bool output_object_data,
+                      const RowDescriptor& output_row_descriptor);
+    virtual ~VFileResultWriter() = default;
+
+    virtual Status append_block(Block& block) override;
+    virtual Status append_row_batch(const RowBatch* batch) override {
+        return Status::NotSupported("append_row_batch is not supported in VFileResultWriter!");
+    };
+
+    virtual Status init(RuntimeState* state) override;
+    virtual Status close() override;
+
+    // file result writer always return statistic result in one row
+    virtual int64_t get_written_rows() const override { return 1; }
+
+private:
+    Status _write_csv_file(const Block& block);
+
+    // if buffer exceed the limit, write the data buffered in _plain_text_outstream via file_writer
+    // if eos, write the data even if buffer is not full.
+    Status _flush_plain_text_outstream(bool eos);
+    void _init_profile();
+
+    Status _create_file_writer(const std::string& file_name);
+    Status _create_next_file_writer();
+    Status _create_success_file();
+    // get next export file name
+    Status _get_next_file_name(std::string* file_name);
+    Status _get_success_file_name(std::string* file_name);
+    Status _get_file_url(std::string* file_url);
+    std::string _file_format_to_name();
+    // close file writer, and if !done, it will create new writer for next file.
+    // if only_close is true, this method will just close the file writer and return.
+    Status _close_file_writer(bool done);
+    // create a new file if current file size exceed limit
+    Status _create_new_file_if_exceed_size();
+    // send the final statistic result
+    Status _send_result();
+    // save result into batch rather than send it
+    Status _fill_result_block();
+
+    RuntimeState* _state; // not owned, set when init
+    const ResultFileOptions* _file_opts;
+    TStorageBackendType::type _storage_type;
+    TUniqueId _fragment_instance_id;
+    const std::vector<ExprContext*>& _output_expr_ctxs;
+
+    // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
+    // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
+    std::unique_ptr<FileWriter> _file_writer_impl;
+    // parquet file writer
+    ParquetWriterWrapper* _parquet_writer = nullptr;
+    // Used to buffer the export data of plain text
+    // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
+    // file writer's write() for every single row.
+    // But this cannot solve the problem of a row of data that is too large.
+    // For example: bitmap_to_string() may return large volumn of data.
+    // And the speed is relative low, in my test, is about 6.5MB/s.
+    std::stringstream _plain_text_outstream;
+    static const size_t OUTSTREAM_BUFFER_SIZE_BYTES;
+
+    // current written bytes, used for split data
+    int64_t _current_written_bytes = 0;
+    // the suffix idx of export file name, start at 0
+    int _file_idx = 0;
+
+    RuntimeProfile* _parent_profile; // profile from result sink, not owned
+    // total time cost on append batch operation
+    RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
+    // tuple convert timer, child timer of _append_row_batch_timer
+    RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
+    // file write timer, child timer of _append_row_batch_timer
+    RuntimeProfile::Counter* _file_write_timer = nullptr;
+    // time of closing the file writer
+    RuntimeProfile::Counter* _writer_close_timer = nullptr;
+    // number of written rows
+    RuntimeProfile::Counter* _written_rows_counter = nullptr;
+    // bytes of written data
+    RuntimeProfile::Counter* _written_data_bytes = nullptr;
+
+    // _sinker and _output_batch are not owned by FileResultWriter
+    BufferControlBlock* _sinker = nullptr;
+    Block* _output_block = nullptr;
+    // set to true if the final statistic result is sent
+    bool _is_result_sent = false;
+    bool _header_sent = false;
+    RowDescriptor _output_row_descriptor;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 249f70ac0f..cbadfc931f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -290,6 +290,41 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
     _name = "VDataStreamSender";
 }
 
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                                     const std::vector<TPlanFragmentDestination>& destinations,
+                                     int per_channel_buffer_size,
+                                     bool send_query_statistics_with_every_batch)
+        : _sender_id(sender_id),
+          _pool(pool),
+          _row_desc(row_desc),
+          _current_channel_idx(0),
+          _ignore_not_found(true),
+          _cur_pb_block(&_pb_block1),
+          _profile(nullptr),
+          _serialize_batch_timer(nullptr),
+          _bytes_sent_counter(nullptr),
+          _local_bytes_send_counter(nullptr),
+          _dest_node_id(0) {
+    _name = "VDataStreamSender";
+}
+
+VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc,
+                                     int per_channel_buffer_size,
+                                     bool send_query_statistics_with_every_batch)
+        : _sender_id(0),
+          _pool(pool),
+          _row_desc(row_desc),
+          _current_channel_idx(0),
+          _ignore_not_found(true),
+          _cur_pb_block(&_pb_block1),
+          _profile(nullptr),
+          _serialize_batch_timer(nullptr),
+          _bytes_sent_counter(nullptr),
+          _local_bytes_send_counter(nullptr),
+          _dest_node_id(0) {
+    _name = "VDataStreamSender";
+}
+
 VDataStreamSender::~VDataStreamSender() {
     _channel_shared_ptrs.clear();
 }
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index d62ebeb684..67faae452b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -45,13 +45,20 @@ namespace vectorized {
 class VExprContext;
 class VPartitionInfo;
 
-class VDataStreamSender final : public DataSink {
+class VDataStreamSender : public DataSink {
 public:
     VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                       const TDataStreamSink& sink,
                       const std::vector<TPlanFragmentDestination>& destinations,
                       int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
 
+    VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                      const std::vector<TPlanFragmentDestination>& destinations,
+                      int per_channel_buffer_size, bool send_query_statistics_with_every_batch);
+
+    VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int per_channel_buffer_size,
+                      bool send_query_statistics_with_every_batch);
+
     ~VDataStreamSender();
 
     virtual Status init(const TDataSink& thrift_sink) override;
@@ -69,10 +76,8 @@ public:
 
     Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
 
-private:
+protected:
     void _roll_pb_block();
-
-private:
     class Channel;
 
     Status get_partition_column_result(Block* block, int* result) const {
diff --git a/be/src/vec/sink/vresult_file_sink.cpp b/be/src/vec/sink/vresult_file_sink.cpp
new file mode 100644
index 0000000000..6d8d994585
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/sink/vresult_file_sink.h"
+
+#include "common/config.h"
+#include "exprs/expr.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/exec_env.h"
+#include "runtime/file_result_writer.h"
+#include "runtime/result_buffer_mgr.h"
+#include "runtime/result_file_sink.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+#include "vec/runtime/vfile_result_writer.h"
+
+namespace doris::vectorized {
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink, int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr)
+        : VDataStreamSender(pool, row_desc, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = true;
+
+    _name = "VResultFileSink";
+}
+
+VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                                 const TResultFileSink& sink,
+                                 const std::vector<TPlanFragmentDestination>& destinations,
+                                 int per_channel_buffer_size,
+                                 bool send_query_statistics_with_every_batch,
+                                 const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs)
+        : VDataStreamSender(pool, sender_id, row_desc, destinations, per_channel_buffer_size,
+                            send_query_statistics_with_every_batch),
+          _t_output_expr(t_output_expr),
+          _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = false;
+    DCHECK_EQ(destinations.size(), 1);
+    _channel_shared_ptrs.emplace_back(new Channel(
+            this, _output_row_descriptor, destinations[0].brpc_server,
+            destinations[0].fragment_instance_id, sink.dest_node_id, _buf_size, true, true));
+    _channels.push_back(_channel_shared_ptrs.back().get());
+
+    _name = "VResultFileSink";
+}
+
+Status VResultFileSink::init(const TDataSink& tsink) {
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare_exprs(RuntimeState* state) {
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+    return Status::OK();
+}
+
+Status VResultFileSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    std::stringstream title;
+    title << "VResultFileSink (fragment_instance_id=" << print_id(state->fragment_instance_id())
+          << ")";
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+    // prepare output_expr
+    RETURN_IF_ERROR(prepare_exprs(state));
+
+    CHECK(_file_opts.get() != nullptr);
+    if (_is_top_sink) {
+        // create sender
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(
+                state->fragment_instance_id(), _buf_size, &_sender));
+        // create writer
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, _sender.get(), nullptr, state->return_object_data_as_binary(),
+                _output_row_descriptor));
+    } else {
+        // init channel
+        _profile = _pool->add(new RuntimeProfile(title.str()));
+        _state = state;
+        _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+        _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+        _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
+        _uncompressed_bytes_counter =
+                ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+        // create writer
+        _output_block.reset(new Block(_output_row_descriptor.tuple_descriptors()[0]->slots(), 1));
+        _writer.reset(new (std::nothrow) VFileResultWriter(
+                _file_opts.get(), _storage_type, state->fragment_instance_id(), _output_expr_ctxs,
+                _profile, nullptr, _output_block.get(), state->return_object_data_as_binary(),
+                _output_row_descriptor));
+    }
+    RETURN_IF_ERROR(_writer->init(state));
+    for (int i = 0; i < _channels.size(); ++i) {
+        RETURN_IF_ERROR(_channels[i]->init(state));
+    }
+    return Status::OK();
+}
+
+Status VResultFileSink::open(RuntimeState* state) {
+    return Expr::open(_output_expr_ctxs, state);
+}
+
+Status VResultFileSink::send(RuntimeState* state, RowBatch* batch) {
+    return Status::NotSupported("Not Implemented VResultFileSink Node::get_next scalar");
+}
+
+Status VResultFileSink::send(RuntimeState* state, Block* block) {
+    RETURN_IF_ERROR(_writer->append_block(*block));
+    return Status::OK();
+}
+
+Status VResultFileSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    Status final_status = exec_status;
+    // close the writer
+    if (_writer) {
+        Status st = _writer->close();
+        if (!st.ok() && exec_status.ok()) {
+            // close file writer failed, should return this error to client
+            final_status = st;
+        }
+    }
+    if (_is_top_sink) {
+        // close sender, this is normal path end
+        if (_sender) {
+            _sender->update_num_written_rows(_writer == nullptr ? 0 : _writer->get_written_rows());
+            _sender->close(final_status);
+        }
+        state->exec_env()->result_mgr()->cancel_at_time(
+                time(nullptr) + config::result_buffer_cancelled_interval_time,
+                state->fragment_instance_id());
+    } else {
+        if (final_status.ok()) {
+            RETURN_IF_ERROR(serialize_block(_output_block.get(), _cur_pb_block, _channels.size()));
+            for (auto channel : _channels) {
+                RETURN_IF_ERROR(channel->send_block(_cur_pb_block));
+            }
+        }
+        Status final_st = Status::OK();
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        // wait all channels to finish
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close_wait(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        _output_block->clear();
+    }
+
+    Expr::close(_output_expr_ctxs, state);
+
+    _closed = true;
+    return Status::OK();
+}
+
+void VResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
+    if (_is_top_sink) {
+        _sender->set_query_statistics(statistics);
+    } else {
+        _query_statistics = statistics;
+    }
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vresult_file_sink.h b/be/src/vec/sink/vresult_file_sink.h
new file mode 100644
index 0000000000..e924883b42
--- /dev/null
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/result_file_sink.h"
+#include "vec/sink/vdata_stream_sender.h"
+
+namespace doris {
+namespace vectorized {
+class VResultWriter;
+
+class VResultFileSink : public VDataStreamSender {
+public:
+    VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const TResultFileSink& sink,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr);
+    VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
+                    const TResultFileSink& sink,
+                    const std::vector<TPlanFragmentDestination>& destinations,
+                    int per_channel_buffer_size, bool send_query_statistics_with_every_batch,
+                    const std::vector<TExpr>& t_output_expr, DescriptorTbl& descs);
+    virtual ~VResultFileSink() = default;
+    virtual Status init(const TDataSink& thrift_sink) override;
+    virtual Status prepare(RuntimeState* state) override;
+    virtual Status open(RuntimeState* state) override;
+    // send data in 'batch' to this backend stream mgr
+    // Blocks until all rows in batch are placed in the buffer
+    virtual Status send(RuntimeState* state, RowBatch* batch) override;
+    virtual Status send(RuntimeState* state, Block* block) override;
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    virtual Status close(RuntimeState* state, Status exec_status) override;
+    virtual RuntimeProfile* profile() override { return _profile; }
+
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
+
+private:
+    Status prepare_exprs(RuntimeState* state);
+    // set file options when sink type is FILE
+    std::unique_ptr<ResultFileOptions> _file_opts;
+    TStorageBackendType::type _storage_type;
+
+    // Owned by the RuntimeState.
+    const std::vector<TExpr>& _t_output_expr;
+    std::vector<ExprContext*> _output_expr_ctxs;
+    RowDescriptor _output_row_descriptor;
+
+    std::unique_ptr<Block> _output_block = nullptr;
+    std::shared_ptr<BufferControlBlock> _sender;
+    std::shared_ptr<VResultWriter> _writer;
+    int _buf_size = 1024; // Allocated from _pool
+    bool _is_top_sink = true;
+};
+} // namespace vectorized
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org