You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/09/07 03:53:42 UTC

[incubator-doris] branch master updated: [Outfile] Support concurrent export of query results (#6539)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9469b2c  [Outfile] Support concurrent export of query results (#6539)
9469b2c is described below

commit 9469b2ce1ab4b6db02dacdd8159d272ed0ed6981
Author: EmmyMiao87 <52...@qq.com>
AuthorDate: Tue Sep 7 11:53:32 2021 +0800

    [Outfile] Support concurrent export of query results (#6539)
    
    This pr mainly supports
    1. Export query result sets concurrently
    2. Query result set export supports s3 protocol
    
    Among them, there are several preconditions for concurrently exporting query result sets
    1. Enable concurrent export variables
    2. The query itself can be exported concurrently
        (some queries containing sort nodes at the top level cannot be exported concurrently)
    3. Export the s3 protocol used instead of the broker
    
    After exporting the result set concurrently,
    the file prefix is changed to outfile_{query_instance_id}_filenumber.{file_format}
---
 be/src/exec/data_sink.cpp                          |  27 ++-
 be/src/exec/data_sink.h                            |   4 +-
 be/src/exec/s3_writer.cpp                          |   5 -
 be/src/exec/s3_writer.h                            |   1 -
 be/src/runtime/CMakeLists.txt                      |   1 +
 be/src/runtime/data_stream_sender.cpp              | 182 +++++--------------
 be/src/runtime/data_stream_sender.h                | 156 ++++++++++++++---
 be/src/runtime/file_result_writer.cpp              | 105 +++++++++--
 be/src/runtime/file_result_writer.h                |  15 ++
 be/src/runtime/plan_fragment_executor.cpp          |   3 +-
 be/src/runtime/result_file_sink.cpp                | 194 +++++++++++++++++++++
 be/src/runtime/result_file_sink.h                  |  84 +++++++++
 be/src/runtime/result_sink.cpp                     |   1 +
 docs/en/administrator-guide/outfile.md             | 132 ++++++++++++--
 docs/zh-CN/administrator-guide/outfile.md          | 128 ++++++++++++--
 .../doris/analysis/CreateRoutineLoadStmt.java      |   8 +-
 .../org/apache/doris/analysis/OutFileClause.java   |  46 +++--
 .../java/org/apache/doris/analysis/SelectStmt.java |   2 +-
 .../java/org/apache/doris/backup/S3Storage.java    |   7 +-
 .../main/java/org/apache/doris/load/ExportJob.java |   5 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   8 +-
 .../apache/doris/load/update/UpdatePlanner.java    |   2 +-
 .../org/apache/doris/planner/ExchangeNode.java     |   7 +
 .../org/apache/doris/planner/PlanFragment.java     |  39 +++--
 .../java/org/apache/doris/planner/PlanNode.java    |   4 +
 .../java/org/apache/doris/planner/Planner.java     | 116 +++++++++---
 .../{ResultSink.java => ResultFileSink.java}       | 103 ++++++-----
 .../java/org/apache/doris/planner/ResultSink.java  |  41 +----
 .../apache/doris/planner/StreamLoadPlanner.java    |   2 +-
 .../main/java/org/apache/doris/qe/Coordinator.java |  29 +--
 .../java/org/apache/doris/qe/SessionVariable.java  |  14 +-
 .../doris/analysis/CreateRoutineLoadStmtTest.java  |  13 +-
 gensrc/thrift/DataSinks.thrift                     |   9 +
 33 files changed, 1109 insertions(+), 384 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index 123f621..7b2e336 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -33,6 +33,7 @@
 #include "runtime/mysql_table_sink.h"
 #include "runtime/odbc_table_sink.h"
 #include "runtime/result_sink.h"
+#include "runtime/result_file_sink.h"
 #include "runtime/runtime_state.h"
 #include "util/logging.h"
 
@@ -43,7 +44,8 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
                                   const TPlanFragmentExecParams& params,
                                   const RowDescriptor& row_desc,
                                   bool is_vec,
-                                  boost::scoped_ptr<DataSink>* sink) {
+                                  boost::scoped_ptr<DataSink>* sink,
+                                  DescriptorTbl& desc_tbl) {
     DataSink* tmp_sink = NULL;
 
     switch (thrift_sink.type) {
@@ -66,7 +68,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         sink->reset(tmp_sink);
         break;
     }
-    case TDataSinkType::RESULT_SINK:
+    case TDataSinkType::RESULT_SINK: {
         if (!thrift_sink.__isset.result_sink) {
             return Status::InternalError("Missing data buffer sink.");
         }
@@ -78,7 +80,22 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         }
         sink->reset(tmp_sink);
         break;
-    case TDataSinkType::MEMORY_SCRATCH_SINK:
+    }
+    case TDataSinkType::RESULT_FILE_SINK: {
+        if (!thrift_sink.__isset.result_file_sink) {
+            return Status::InternalError("Missing result file sink.");
+        }
+        // Result file sink is not the top sink
+        if (params.__isset.destinations && params.destinations.size() > 0) {
+            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink,
+                                          params.destinations, pool, params.sender_id, desc_tbl);
+        } else {
+            tmp_sink = new ResultFileSink(row_desc, output_exprs, thrift_sink.result_file_sink);
+        }
+        sink->reset(tmp_sink);
+        break;
+    }
+    case TDataSinkType::MEMORY_SCRATCH_SINK: {
         if (!thrift_sink.__isset.memory_scratch_sink) {
             return Status::InternalError("Missing data buffer sink.");
         }
@@ -86,6 +103,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         tmp_sink = new MemoryScratchSink(row_desc, output_exprs, thrift_sink.memory_scratch_sink);
         sink->reset(tmp_sink);
         break;
+    }
     case TDataSinkType::MYSQL_TABLE_SINK: {
 #ifdef DORIS_WITH_MYSQL
         if (!thrift_sink.__isset.mysql_table_sink) {
@@ -138,7 +156,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         break;
     }
 
-    default:
+    default: {
         std::stringstream error_msg;
         std::map<int, const char*>::const_iterator i =
                 _TDataSinkType_VALUES_TO_NAMES.find(thrift_sink.type);
@@ -151,6 +169,7 @@ Status DataSink::create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink
         error_msg << str << " not implemented.";
         return Status::InternalError(error_msg.str());
     }
+    }
 
     if (sink->get() != NULL) {
         RETURN_IF_ERROR((*sink)->init(thrift_sink));
diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h
index e351427..8d1bb64 100644
--- a/be/src/exec/data_sink.h
+++ b/be/src/exec/data_sink.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "gen_cpp/DataSinks_types.h"
 #include "gen_cpp/Exprs_types.h"
+#include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/query_statistics.h"
 
@@ -75,7 +76,8 @@ public:
                                    const TPlanFragmentExecParams& params,
                                    const RowDescriptor& row_desc,
                                    bool is_vec,
-                                   boost::scoped_ptr<DataSink>* sink);
+                                   boost::scoped_ptr<DataSink>* sink,
+                                   DescriptorTbl& desc_tbl);
 
     // Returns the runtime profile for the sink.
     virtual RuntimeProfile* profile() = 0;
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index a8cb377..97545d1 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -40,7 +40,6 @@ S3Writer::S3Writer(const std::map<std::string, std::string>& properties, const s
         : _properties(properties),
           _path(path),
           _uri(path),
-          _sync_needed(false),
           _client(ClientFactory::instance().create(_properties)),
           _temp_file(std::make_shared<Aws::Utils::TempFile>(
                   std::ios_base::binary | std::ios_base::trunc | std::ios_base::in |
@@ -80,7 +79,6 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len, size_t* written_len)
     if (!_temp_file) {
         return Status::BufferAllocFailed("The internal temporary file is not writable.");
     }
-    _sync_needed = true;
     _temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
     if (!_temp_file->good()) {
         return Status::BufferAllocFailed("Could not append to the internal temporary file.");
@@ -101,9 +99,6 @@ Status S3Writer::_sync() {
     if (!_temp_file) {
         return Status::BufferAllocFailed("The internal temporary file is not writable.");
     }
-    if (!_sync_needed) {
-        return Status::OK();
-    }
     CHECK_S3_CLIENT(_client);
     Aws::S3::Model::PutObjectRequest request;
     request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
diff --git a/be/src/exec/s3_writer.h b/be/src/exec/s3_writer.h
index 663703f..09084ac 100644
--- a/be/src/exec/s3_writer.h
+++ b/be/src/exec/s3_writer.h
@@ -52,7 +52,6 @@ private:
     const std::map<std::string, std::string>& _properties;
     std::string _path;
     S3URI _uri;
-    bool _sync_needed;
     std::shared_ptr<Aws::S3::S3Client> _client;
     std::shared_ptr<Aws::Utils::TempFile> _temp_file;
 };
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 26a4f2d..c1ff746 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -40,6 +40,7 @@ set(RUNTIME_FILES
     raw_value.cpp
     raw_value_ir.cpp
     result_sink.cpp
+    result_file_sink.cpp
     result_buffer_mgr.cpp
     result_writer.cpp
     row_batch.cpp
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 777d6da..fa7c979 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -29,10 +29,6 @@
 #include "common/logging.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
-#include "gen_cpp/BackendService.h"
-#include "gen_cpp/PaloInternalService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "gen_cpp/internal_service.pb.h"
 #include "runtime/client_cache.h"
 #include "runtime/data_stream_mgr.h"
 #include "runtime/data_stream_recvr.h"
@@ -50,152 +46,42 @@
 #include "util/debug_util.h"
 #include "util/defer_op.h"
 #include "util/network_util.h"
-#include "util/ref_count_closure.h"
 #include "util/thrift_client.h"
 #include "util/thrift_util.h"
 
 namespace doris {
 
-// A channel sends data asynchronously via calls to transmit_data
-// to a single destination ipaddress/node.
-// It has a fixed-capacity buffer and allows the caller either to add rows to
-// that buffer individually (AddRow()), or circumvent the buffer altogether and send
-// TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
-// at any one time (ie, sending will block if the most recent rpc hasn't finished,
-// which allows the receiver node to throttle the sender by withholding acks).
-// *Not* thread-safe.
-class DataStreamSender::Channel {
-public:
-    // Create channel to send data to particular ipaddress/port/query/node
-    // combination. buffer_size is specified in bytes and a soft limit on
-    // how much tuple data is getting accumulated before being sent; it only applies
-    // when data is added via add_row() and not sent directly via send_batch().
-    Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
-            const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
-            PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
-            bool send_query_statistics_with_every_batch)
-            : _parent(parent),
-              _buffer_size(buffer_size),
-              _row_desc(row_desc),
-              _fragment_instance_id(fragment_instance_id),
-              _dest_node_id(dest_node_id),
-              _num_data_bytes_sent(0),
-              _packet_seq(0),
-              _need_close(false),
-              _be_number(0),
-              _brpc_dest_addr(brpc_dest),
-              _is_transfer_chain(is_transfer_chain),
-              _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
-        std::string localhost = BackendOptions::get_localhost();
-        _is_local =
-                _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
-        if (_is_local) {
-            LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
-        }
-    }
-
-    virtual ~Channel() {
-        if (_closure != nullptr && _closure->unref()) {
-            delete _closure;
-        }
-        // release this before request desctruct
-        _brpc_request.release_finst_id();
-    }
-
-    // Initialize channel.
-    // Returns OK if successful, error indication otherwise.
-    Status init(RuntimeState* state);
-
-    // Copies a single row into this channel's output buffer and flushes buffer
-    // if it reaches capacity.
-    // Returns error status if any of the preceding rpcs failed, OK otherwise.
-    Status add_row(TupleRow* row);
-
-    // Asynchronously sends a row batch.
-    // Returns the status of the most recently finished transmit_data
-    // rpc (or OK if there wasn't one that hasn't been reported yet).
-    // if batch is nullptr, send the eof packet
-    Status send_batch(PRowBatch* batch, bool eos = false);
-
-    Status send_local_batch(bool eos);
-
-    Status send_local_batch(RowBatch* batch, bool use_move);
-
-    // Flush buffered rows and close channel. This function don't wait the response
-    // of close operation, client should call close_wait() to finish channel's close.
-    // We split one close operation into two phases in order to make multiple channels
-    // can run parallel.
-    Status close(RuntimeState* state);
-
-    // Get close wait's response, to finish channel close operation.
-    Status close_wait(RuntimeState* state);
-
-    int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
-    PRowBatch* pb_batch() { return &_pb_batch; }
-
-    std::string get_fragment_instance_id_str() {
-        UniqueId uid(_fragment_instance_id);
-        return uid.to_string();
-    }
-
-    TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
-
-    bool is_local() { return _is_local; }
-
-private:
-    inline Status _wait_last_brpc() {
-        if (_closure == nullptr) return Status::OK();
-        auto cntl = &_closure->cntl;
-        brpc::Join(cntl->call_id());
-        if (cntl->Failed()) {
-            std::stringstream ss;
-            ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
-               << ", error_text=" << cntl->ErrorText()
-               << ", client: " << BackendOptions::get_localhost();
-            LOG(WARNING) << ss.str();
-            return Status::ThriftRpcError(ss.str());
-        }
-        return Status::OK();
+DataStreamSender::Channel::Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
+                               const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
+                               PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
+                               bool send_query_statistics_with_every_batch)
+       : _parent(parent),
+         _buffer_size(buffer_size),
+         _row_desc(row_desc),
+         _fragment_instance_id(fragment_instance_id),
+         _dest_node_id(dest_node_id),
+         _num_data_bytes_sent(0),
+         _packet_seq(0),
+         _need_close(false),
+         _be_number(0),
+         _brpc_dest_addr(brpc_dest),
+         _is_transfer_chain(is_transfer_chain),
+         _send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) {
+    std::string localhost = BackendOptions::get_localhost();
+    _is_local =
+           _brpc_dest_addr.hostname == localhost && _brpc_dest_addr.port == config::brpc_port;
+    if (_is_local) {
+       LOG(INFO) << "will use local exechange, dest_node_id:" << _dest_node_id;
     }
+}
 
-private:
-    // Serialize _batch into _thrift_batch and send via send_batch().
-    // Returns send_batch() status.
-    Status send_current_batch(bool eos = false);
-    Status close_internal();
-
-    DataStreamSender* _parent;
-    int _buffer_size;
-
-    const RowDescriptor& _row_desc;
-    TUniqueId _fragment_instance_id;
-    PlanNodeId _dest_node_id;
-
-    // the number of TRowBatch.data bytes sent successfully
-    int64_t _num_data_bytes_sent;
-    int64_t _packet_seq;
-
-    // we're accumulating rows into this batch
-    boost::scoped_ptr<RowBatch> _batch;
-
-    bool _need_close;
-    int _be_number;
-
-    TNetworkAddress _brpc_dest_addr;
-
-    // TODO(zc): initused for brpc
-    PUniqueId _finst_id;
-    PRowBatch _pb_batch;
-    PTransmitDataParams _brpc_request;
-    PBackendService_Stub* _brpc_stub = nullptr;
-    RefCountClosure<PTransmitDataResult>* _closure = nullptr;
-    int32_t _brpc_timeout_ms = 500;
-    // whether the dest can be treated as query statistics transfer chain.
-    bool _is_transfer_chain;
-    bool _send_query_statistics_with_every_batch;
-    bool _is_local;
-};
+DataStreamSender::Channel::~Channel() {
+   if (_closure != nullptr && _closure->unref()) {
+       delete _closure;
+   }
+   // release this before request desctruct
+   _brpc_request.release_finst_id();
+}
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
     _be_number = state->be_number();
@@ -371,6 +257,16 @@ Status DataStreamSender::Channel::close_wait(RuntimeState* state) {
     return Status::OK();
 }
 
+DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc)
+        : _pool(pool),
+         _sender_id(sender_id),
+         _row_desc(row_desc),
+         _serialize_batch_timer(NULL),
+         _bytes_sent_counter(NULL),
+         _local_bytes_send_counter(NULL),
+         _current_pb_batch(&_pb_batch1) {
+}
+
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                                    const TDataStreamSink& sink,
                                    const std::vector<TPlanFragmentDestination>& destinations,
diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h
index 5860403..7193a14 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -26,7 +26,15 @@
 #include "common/status.h"
 #include "exec/data_sink.h"
 #include "gen_cpp/data.pb.h" // for PRowBatch
+#include "gen_cpp/BackendService.h"
+#include "gen_cpp/Types_types.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/internal_service.pb.h"
 #include "util/runtime_profile.h"
+#include "util/ref_count_closure.h"
+#include "util/uid_util.h"
+#include "service/brpc.h"
+#include "service/backend_options.h"
 
 namespace doris {
 
@@ -50,6 +58,7 @@ class MemTracker;
 // across channels.
 class DataStreamSender : public DataSink {
 public:
+    DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc);
     // Construct a sender according to the output specification (sink),
     // sending to the given destinations.
     // Per_channel_buffer_size is the buffer size allocated to each channel
@@ -97,9 +106,132 @@ public:
 
     RuntimeState* state() { return _state; }
 
-private:
-    class Channel;
+protected:
+    const RowDescriptor& _row_desc;
+    // A channel sends data asynchronously via calls to transmit_data
+    // to a single destination ipaddress/node.
+    // It has a fixed-capacity buffer and allows the caller either to add rows to
+    // that buffer individually (AddRow()), or circumvent the buffer altogether and send
+    // TRowBatches directly (SendBatch()). Either way, there can only be one in-flight RPC
+    // at any one time (ie, sending will block if the most recent rpc hasn't finished,
+    // which allows the receiver node to throttle the sender by withholding acks).
+    // *Not* thread-safe.
+    class Channel {
+    public:
+        Channel(DataStreamSender* parent, const RowDescriptor& row_desc,
+                const TNetworkAddress& brpc_dest, const TUniqueId& fragment_instance_id,
+                PlanNodeId dest_node_id, int buffer_size, bool is_transfer_chain,
+                bool send_query_statistics_with_every_batch);
+        ~Channel();
+        // Initialize channel.
+        // Returns OK if successful, error indication otherwise.
+        Status init(RuntimeState* state);
+
+        // Copies a single row into this channel's output buffer and flushes buffer
+        // if it reaches capacity.
+        // Returns error status if any of the preceding rpcs failed, OK otherwise.
+        Status add_row(TupleRow* row);
+
+        // Asynchronously sends a row batch.
+        // Returns the status of the most recently finished transmit_data
+        // rpc (or OK if there wasn't one that hasn't been reported yet).
+        // if batch is nullptr, send the eof packet
+        Status send_batch(PRowBatch* batch, bool eos = false);
+
+        Status send_local_batch(bool eos);
+
+        Status send_local_batch(RowBatch* batch, bool use_move);
+
+        // Flush buffered rows and close channel. This function don't wait the response
+        // of close operation, client should call close_wait() to finish channel's close.
+        // We split one close operation into two phases in order to make multiple channels
+        // can run parallel.
+        Status close(RuntimeState* state);
+
+        // Get close wait's response, to finish channel close operation.
+        Status close_wait(RuntimeState* state);
+
+        int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
+
+        PRowBatch* pb_batch() { return &_pb_batch; }
+
+        std::string get_fragment_instance_id_str() {
+            UniqueId uid(_fragment_instance_id);
+            return uid.to_string();
+        }
+
+        TUniqueId get_fragment_instance_id() { return _fragment_instance_id; }
+
+        bool is_local() { return _is_local; }
+
+        inline Status _wait_last_brpc() {
+                if (_closure == nullptr) return Status::OK();
+                auto cntl = &_closure->cntl;
+                brpc::Join(cntl->call_id());
+                if (cntl->Failed()) {
+                    std::stringstream ss;
+                    ss << "failed to send brpc batch, error=" << berror(cntl->ErrorCode())
+                       << ", error_text=" << cntl->ErrorText()
+                       << ", client: " << BackendOptions::get_localhost();
+                    LOG(WARNING) << ss.str();
+                    return Status::ThriftRpcError(ss.str());
+                }
+                return Status::OK();
+        }
+        // Serialize _batch into _thrift_batch and send via send_batch().
+        // Returns send_batch() status.
+        Status send_current_batch(bool eos = false);
+        Status close_internal();
+
+        DataStreamSender* _parent;
+        int _buffer_size;
+
+        const RowDescriptor& _row_desc;
+        TUniqueId _fragment_instance_id;
+        PlanNodeId _dest_node_id;
+
+        // the number of TRowBatch.data bytes sent successfully
+        int64_t _num_data_bytes_sent;
+        int64_t _packet_seq;
+
+        // we're accumulating rows into this batch
+        boost::scoped_ptr<RowBatch> _batch;
+
+        bool _need_close;
+        int _be_number;
+
+        TNetworkAddress _brpc_dest_addr;
+
+        // TODO(zc): initused for brpc
+        PUniqueId _finst_id;
+        PRowBatch _pb_batch;
+        PTransmitDataParams _brpc_request;
+        PBackendService_Stub* _brpc_stub = nullptr;
+        RefCountClosure<PTransmitDataResult>* _closure = nullptr;
+        int32_t _brpc_timeout_ms = 500;
+        // whether the dest can be treated as query statistics transfer chain.
+        bool _is_transfer_chain;
+        bool _send_query_statistics_with_every_batch;
+        bool _is_local;
+    };
 
+    RuntimeProfile* _profile; // Allocated from _pool
+    PRowBatch* _current_pb_batch;
+    std::shared_ptr<MemTracker> _mem_tracker;
+    ObjectPool* _pool;
+    // Sender instance id, unique within a fragment.
+    int _sender_id;
+    RuntimeProfile::Counter* _serialize_batch_timer;
+    RuntimeProfile::Counter* _bytes_sent_counter;
+    // Used to counter send bytes under local data exchange
+    RuntimeProfile::Counter* _local_bytes_send_counter;
+    RuntimeProfile::Counter* _uncompressed_bytes_counter;
+    RuntimeState* _state;
+
+    std::vector<Channel*> _channels;
+    std::vector<std::shared_ptr<Channel>> _channel_shared_ptrs;
+
+private:
     Status compute_range_part_code(RuntimeState* state, TupleRow* row, size_t* hash_value,
                                    bool* ignore);
 
@@ -110,13 +242,6 @@ private:
     Status process_distribute(RuntimeState* state, TupleRow* row, const PartitionInfo* part,
                               size_t* hash_val);
 
-    // Sender instance id, unique within a fragment.
-    int _sender_id;
-
-    RuntimeState* _state;
-    ObjectPool* _pool;
-    const RowDescriptor& _row_desc;
-
     int _current_channel_idx; // index of current channel to send to if _random == true
 
     TPartitionType::type _part_type;
@@ -126,31 +251,18 @@ private:
     // one while the other one is still being sent
     PRowBatch _pb_batch1;
     PRowBatch _pb_batch2;
-    PRowBatch* _current_pb_batch = nullptr;
 
     std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row partition values
 
-    std::vector<Channel*> _channels;
-    std::vector<std::shared_ptr<Channel>> _channel_shared_ptrs;
-
     // map from range value to partition_id
     // sorted in ascending orderi by range for binary search
     std::vector<PartitionInfo*> _partition_infos;
 
-    RuntimeProfile* _profile; // Allocated from _pool
-    RuntimeProfile::Counter* _serialize_batch_timer;
-    RuntimeProfile::Counter* _bytes_sent_counter;
-    RuntimeProfile::Counter* _uncompressed_bytes_counter;
     RuntimeProfile::Counter* _ignore_rows;
 
-    std::shared_ptr<MemTracker> _mem_tracker;
-
     // Throughput per total time spent in sender
     RuntimeProfile::Counter* _overall_throughput;
 
-    // Used to counter send bytes under local data exchange
-    RuntimeProfile::Counter* _local_bytes_send_counter;
-
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 };
diff --git a/be/src/runtime/file_result_writer.cpp b/be/src/runtime/file_result_writer.cpp
index 9cb5751..4edfdb5 100644
--- a/be/src/runtime/file_result_writer.cpp
+++ b/be/src/runtime/file_result_writer.cpp
@@ -20,14 +20,17 @@
 #include "exec/broker_writer.h"
 #include "exec/local_file_writer.h"
 #include "exec/parquet_writer.h"
+#include "exec/s3_writer.h"
 #include "exprs/expr.h"
 #include "exprs/expr_context.h"
 #include "gen_cpp/PaloInternalService_types.h"
 #include "runtime/buffer_control_block.h"
 #include "runtime/primitive_type.h"
+#include "runtime/raw_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
 #include "runtime/tuple_row.h"
+#include "runtime/string_value.h"
 #include "service/backend_options.h"
 #include "util/date_func.h"
 #include "util/file_utils.h"
@@ -39,13 +42,39 @@ namespace doris {
 
 const size_t FileResultWriter::OUTSTREAM_BUFFER_SIZE_BYTES = 1024 * 1024;
 
+// deprecated
 FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
                                    const std::vector<ExprContext*>& output_expr_ctxs,
                                    RuntimeProfile* parent_profile, BufferControlBlock* sinker)
         : _file_opts(file_opts),
           _output_expr_ctxs(output_expr_ctxs),
           _parent_profile(parent_profile),
-          _sinker(sinker) {}
+          _sinker(sinker) {
+        if (_file_opts->is_local_file) {
+            _storage_type = TStorageBackendType::LOCAL;
+        } else {
+            _storage_type = TStorageBackendType::BROKER;
+        }
+        // The new file writer needs to use fragment instance id as part of the file prefix.
+        // But during the upgrade process, the old version of fe will be called to the new version of be,
+        // resulting in no such attribute. So we need a mock here.
+        _fragment_instance_id.hi = 12345678987654321;
+        _fragment_instance_id.lo = 98765432123456789;
+    }
+
+FileResultWriter::FileResultWriter(const ResultFileOptions* file_opts,
+                                   const TStorageBackendType::type storage_type,
+                                   const TUniqueId fragment_instance_id,
+                                   const std::vector<ExprContext*>& output_expr_ctxs,
+                                   RuntimeProfile* parent_profile, BufferControlBlock* sinker,
+                                   RowBatch* output_batch)
+        : _file_opts(file_opts),
+          _storage_type(storage_type),
+          _fragment_instance_id(fragment_instance_id),
+          _output_expr_ctxs(output_expr_ctxs),
+          _parent_profile(parent_profile),
+          _sinker(sinker),
+          _output_batch(output_batch) {}
 
 FileResultWriter::~FileResultWriter() {
     _close_file_writer(true);
@@ -78,7 +107,7 @@ Status FileResultWriter::_get_success_file_name(std::string* file_name) {
     std::stringstream ss;
     ss << _file_opts->file_path << _file_opts->success_file_name;
     *file_name = ss.str();
-    if (_file_opts->is_local_file) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
         // For local file writer, the file_path is a local dir.
         // Here we do a simple security verification by checking whether the file exists.
         // Because the file path is currently arbitrarily specified by the user,
@@ -100,12 +129,14 @@ Status FileResultWriter::_create_next_file_writer() {
 }
 
 Status FileResultWriter::_create_file_writer(const std::string& file_name) {
-    if (_file_opts->is_local_file) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
         _file_writer = new LocalFileWriter(file_name, 0 /* start offset */);
-    } else {
+    } else if (_storage_type == TStorageBackendType::BROKER){
         _file_writer =
                 new BrokerWriter(_state->exec_env(), _file_opts->broker_addresses,
                                  _file_opts->broker_properties, file_name, 0 /*start offset*/);
+    } else if (_storage_type == TStorageBackendType::S3) {
+        _file_writer =  new S3Writer(_file_opts->broker_properties, file_name, 0 /* offset */);
     }
     RETURN_IF_ERROR(_file_writer->open());
     switch (_file_opts->file_format) {
@@ -126,12 +157,13 @@ Status FileResultWriter::_create_file_writer(const std::string& file_name) {
     return Status::OK();
 }
 
-// file name format as: my_prefix_0.csv
+// file name format as: my_prefix_{fragment_instance_id}_0.csv
 Status FileResultWriter::_get_next_file_name(std::string* file_name) {
     std::stringstream ss;
-    ss << _file_opts->file_path << (_file_idx++) << "." << _file_format_to_name();
+    ss << _file_opts->file_path << print_id(_fragment_instance_id)
+       << "_" << (_file_idx++) << "." << _file_format_to_name();
     *file_name = ss.str();
-    if (_file_opts->is_local_file) {
+    if (_storage_type == TStorageBackendType::LOCAL) {
         // For local file writer, the file_path is a local dir.
         // Here we do a simple security verification by checking whether the file exists.
         // Because the file path is currently arbitrarily specified by the user,
@@ -146,6 +178,22 @@ Status FileResultWriter::_get_next_file_name(std::string* file_name) {
     return Status::OK();
 }
 
+// file url format as:
+// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
+// S3: {file_path}{fragment_instance_id}_
+// BROKER: {file_path}{fragment_instance_id}_
+
+Status FileResultWriter::_get_file_url(std::string* file_url) {
+    std::stringstream ss;
+    if (_storage_type == TStorageBackendType::LOCAL) {
+        ss << "file:///" << BackendOptions::get_localhost();
+    }
+    ss << _file_opts->file_path;
+    ss << print_id(_fragment_instance_id) << "_";
+    *file_url = ss.str();
+    return Status::OK();
+}
+
 std::string FileResultWriter::_file_format_to_name() {
     switch (_file_opts->file_format) {
     case TFileFormatType::FORMAT_CSV_PLAIN:
@@ -355,7 +403,11 @@ Status FileResultWriter::_close_file_writer(bool done, bool only_close) {
             // write success file, just need to touch an empty file
             RETURN_IF_ERROR(_create_success_file());
         }
-        RETURN_IF_ERROR(_send_result());
+        if (_output_batch == nullptr) {
+            RETURN_IF_ERROR(_send_result());
+        } else {
+            RETURN_IF_ERROR(_fill_result_batch());
+        }
     }
     return Status::OK();
 }
@@ -374,8 +426,9 @@ Status FileResultWriter::_send_result() {
     row_buffer.push_int(_file_idx);                         // file number
     row_buffer.push_bigint(_written_rows_counter->value()); // total rows
     row_buffer.push_bigint(_written_data_bytes->value());   // file size
-    std::string localhost = BackendOptions::get_localhost();
-    row_buffer.push_string(localhost.c_str(), localhost.length()); // url
+    std::string file_url;
+    _get_file_url(&file_url);
+    row_buffer.push_string(file_url.c_str(), file_url.length()); // url
 
     std::unique_ptr<TFetchDataResult> result = std::make_unique<TFetchDataResult>();
     result->result_batch.rows.resize(1);
@@ -384,6 +437,36 @@ Status FileResultWriter::_send_result() {
     return Status::OK();
 }
 
+Status FileResultWriter::_fill_result_batch() {
+    if (_is_result_sent) {
+        return Status::OK();
+    }
+    _is_result_sent = true;
+
+    TupleDescriptor* tuple_desc = _output_batch->row_desc().tuple_descriptors()[0];
+    Tuple* tuple = (Tuple*)_output_batch->tuple_data_pool()->allocate(tuple_desc->byte_size());
+    _output_batch->get_row(_output_batch->add_row())->set_tuple(0, tuple);
+    memset(tuple, 0, tuple_desc->byte_size());
+
+    MemPool* tuple_pool = _output_batch->tuple_data_pool();
+    RawValue::write(&_file_idx, tuple, tuple_desc->slots()[0], tuple_pool);
+    int64_t written_rows = _written_rows_counter->value();
+    RawValue::write(&written_rows, tuple, tuple_desc->slots()[1], tuple_pool);
+    int64_t written_data_bytes = _written_data_bytes->value();
+    RawValue::write(&written_data_bytes, tuple, tuple_desc->slots()[2], tuple_pool);
+
+    StringValue* url_str_val =
+        reinterpret_cast<StringValue*>(tuple->get_slot(tuple_desc->slots()[3]->tuple_offset()));
+    std::string file_url;
+    _get_file_url(&file_url);
+    url_str_val->ptr = (char*)_output_batch->tuple_data_pool()->allocate(file_url.length());
+    url_str_val->len = file_url.length();
+    memcpy(url_str_val->ptr, file_url.c_str(), url_str_val->len);
+    
+    _output_batch->commit_last_row();
+    return Status::OK();
+}
+
 Status FileResultWriter::close() {
     // the following 2 profile "_written_rows_counter" and "_writer_close_timer"
     // must be outside the `_close_file_writer()`.
@@ -392,7 +475,7 @@ Status FileResultWriter::close() {
     // so does the profile in RuntimeState.
     COUNTER_SET(_written_rows_counter, _written_rows);
     SCOPED_TIMER(_writer_close_timer);
-    return _close_file_writer(true);
+    return _close_file_writer(true, false);
 }
 
 } // namespace doris
diff --git a/be/src/runtime/file_result_writer.h b/be/src/runtime/file_result_writer.h
index f00a09f..a38ff9f 100644
--- a/be/src/runtime/file_result_writer.h
+++ b/be/src/runtime/file_result_writer.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "gen_cpp/DataSinks_types.h"
+#include "gen_cpp/Types_types.h"
 #include "runtime/result_writer.h"
 #include "runtime/runtime_state.h"
 
@@ -31,6 +32,7 @@ class RuntimeProfile;
 class TupleRow;
 
 struct ResultFileOptions {
+    // deprecated
     bool is_local_file;
     std::string file_path;
     TFileFormatType::type file_format;
@@ -79,6 +81,13 @@ public:
                      const std::vector<ExprContext*>& output_expr_ctxs,
                      RuntimeProfile* parent_profile,
                      BufferControlBlock* sinker);
+    FileResultWriter(const ResultFileOptions* file_option,
+                     const TStorageBackendType::type storage_type,
+                     const TUniqueId fragment_instance_id,
+                     const std::vector<ExprContext*>& output_expr_ctxs,
+                     RuntimeProfile* parent_profile,
+                     BufferControlBlock* sinker,
+                     RowBatch* output_batch);
     virtual ~FileResultWriter();
 
     virtual Status init(RuntimeState* state) override;
@@ -104,6 +113,7 @@ private:
     // get next export file name
     Status _get_next_file_name(std::string* file_name);
     Status _get_success_file_name(std::string* file_name);
+    Status _get_file_url(std::string* file_url);
     std::string _file_format_to_name();
     // close file writer, and if !done, it will create new writer for next file.
     // if only_close is true, this method will just close the file writer and return.
@@ -112,10 +122,14 @@ private:
     Status _create_new_file_if_exceed_size();
     // send the final statistic result
     Status _send_result();
+    // save result into batch rather than send it
+    Status _fill_result_batch();
 
 private:
     RuntimeState* _state; // not owned, set when init
     const ResultFileOptions* _file_opts;
+    TStorageBackendType::type _storage_type;
+    TUniqueId _fragment_instance_id;
     const std::vector<ExprContext*>& _output_expr_ctxs;
 
     // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
@@ -152,6 +166,7 @@ private:
     RuntimeProfile::Counter* _written_data_bytes = nullptr;
 
     BufferControlBlock* _sinker;
+    RowBatch* _output_batch;
     // set to true if the final statistic result is sent
     bool _is_result_sent = false;
 };
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index e0ea04d..37d162e 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -200,7 +200,8 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
     if (request.fragment.__isset.output_sink) {
         RETURN_IF_ERROR(DataSink::create_data_sink(obj_pool(), request.fragment.output_sink,
                                                    request.fragment.output_exprs, params,
-                                                   row_desc(), runtime_state()->enable_vectorized_exec(), &_sink));
+                                                   row_desc(), runtime_state()->enable_vectorized_exec(),
+                                                   &_sink, *desc_tbl));
         RETURN_IF_ERROR(_sink->prepare(runtime_state()));
 
         RuntimeProfile* sink_profile = _sink->profile();
diff --git a/be/src/runtime/result_file_sink.cpp b/be/src/runtime/result_file_sink.cpp
new file mode 100644
index 0000000..090fe6f
--- /dev/null
+++ b/be/src/runtime/result_file_sink.cpp
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/result_file_sink.h"
+
+#include "common/config.h"
+#include "exprs/expr.h"
+#include "runtime/buffer_control_block.h"
+#include "runtime/exec_env.h"
+#include "runtime/file_result_writer.h"
+#include "runtime/mem_tracker.h"
+#include "runtime/mysql_result_writer.h"
+#include "runtime/result_buffer_mgr.h"
+#include "runtime/row_batch.h"
+#include "runtime/runtime_state.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+ResultFileSink::ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr,
+                       const TResultFileSink& sink)
+         : DataStreamSender(nullptr, 0, row_desc), _t_output_expr(t_output_expr){
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = true;
+
+    _name = "ResultFileSink";
+}
+
+ResultFileSink::ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& t_output_expr,
+                       const TResultFileSink& sink,
+                       const std::vector<TPlanFragmentDestination>& destinations,
+                       ObjectPool* pool, int sender_id, DescriptorTbl& descs)
+        : DataStreamSender(pool, sender_id, row_desc), _t_output_expr(t_output_expr),
+         _output_row_descriptor(descs.get_tuple_descriptor(sink.output_tuple_id), false) {
+    CHECK(sink.__isset.file_options);
+    _file_opts.reset(new ResultFileOptions(sink.file_options));
+    CHECK(sink.__isset.storage_backend_type);
+    _storage_type = sink.storage_backend_type;
+    _is_top_sink = false;
+    DCHECK_EQ(destinations.size(), 1);
+    _channel_shared_ptrs.emplace_back(
+            new Channel(this, _output_row_descriptor, destinations[0].brpc_server,
+                        destinations[0].fragment_instance_id,
+                        sink.dest_node_id, _buf_size, true, true));
+    _channels.push_back(_channel_shared_ptrs.back().get());
+
+    _name = "ResultFileSink";
+}
+
+ResultFileSink::~ResultFileSink() {
+    if (_output_batch != nullptr) {
+        delete _output_batch;
+    }
+}
+
+Status ResultFileSink::prepare_exprs(RuntimeState* state) {
+    // From the thrift expressions create the real exprs.
+    RETURN_IF_ERROR(Expr::create_expr_trees(state->obj_pool(), _t_output_expr, &_output_expr_ctxs));
+    // Prepare the exprs to run.
+    RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _row_desc, _expr_mem_tracker));
+    return Status::OK();
+}
+
+Status ResultFileSink::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(DataSink::prepare(state));
+    std::stringstream title;
+    title << "DataBufferSender (dst_fragment_instance_id="
+          << print_id(state->fragment_instance_id()) << ")";
+    // create profile
+    _profile = state->obj_pool()->add(new RuntimeProfile(title.str()));
+    // prepare output_expr
+    RETURN_IF_ERROR(prepare_exprs(state));
+
+    CHECK(_file_opts.get() != nullptr);
+    if (_is_top_sink) {
+        // create sender
+        RETURN_IF_ERROR(state->exec_env()->result_mgr()->create_sender(state->fragment_instance_id(),
+                                                                       _buf_size, &_sender));
+        // create writer
+        _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _storage_type,
+                                                         state->fragment_instance_id(), _output_expr_ctxs,
+                                                         _profile, _sender.get(), nullptr));
+    } else {
+        // init channel
+        _profile = _pool->add(new RuntimeProfile(title.str()));
+        _state = state;
+        _serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+        _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
+        _local_bytes_send_counter = ADD_COUNTER(profile(), "LocalBytesSent", TUnit::BYTES);
+        _uncompressed_bytes_counter = ADD_COUNTER(profile(), "UncompressedRowBatchSize", TUnit::BYTES);
+        _mem_tracker = MemTracker::CreateTracker(
+                _profile, -1, "ResultFileSink:" + print_id(state->fragment_instance_id()),
+                state->instance_mem_tracker());
+        for (int i = 0; i < _channels.size(); ++i) {
+            RETURN_IF_ERROR(_channels[i]->init(state));
+        }
+        // create writer
+        _output_batch = new RowBatch(_output_row_descriptor, 1024, _mem_tracker.get());
+        _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _storage_type,
+                                                         state->fragment_instance_id(), _output_expr_ctxs,
+                                                         _profile, nullptr, _output_batch));
+
+    }
+    RETURN_IF_ERROR(_writer->init(state));
+    return Status::OK();
+}
+
+Status ResultFileSink::open(RuntimeState* state) {
+    return Expr::open(_output_expr_ctxs, state);
+}
+
+Status ResultFileSink::send(RuntimeState* state, RowBatch* batch) {
+    RETURN_IF_ERROR(_writer->append_row_batch(batch));
+    return Status::OK();
+}
+
+Status ResultFileSink::close(RuntimeState* state, Status exec_status) {
+    if (_closed) {
+        return Status::OK();
+    }
+
+    Status final_status = exec_status;
+    // close the writer
+    if (_writer) {
+        Status st = _writer->close();
+        if (!st.ok() && exec_status.ok()) {
+            // close file writer failed, should return this error to client
+            final_status = st;
+        }
+    }
+    if (_is_top_sink) {
+        // close sender, this is normal path end
+        if (_sender) {
+            _sender->update_num_written_rows(_writer->get_written_rows());
+            _sender->close(final_status);
+        }
+        state->exec_env()->result_mgr()->cancel_at_time(
+                time(NULL) + config::result_buffer_cancelled_interval_time,
+                state->fragment_instance_id());
+    } else {
+        RETURN_IF_ERROR(serialize_batch(_output_batch, _current_pb_batch, _channels.size()));
+        for (auto channel : _channels) {
+            RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
+        }
+        Status final_st = Status::OK();
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        // wait all channels to finish
+        for (int i = 0; i < _channels.size(); ++i) {
+            Status st = _channels[i]->close_wait(state);
+            if (!st.ok() && final_st.ok()) {
+                final_st = st;
+            }
+        }
+        // release row batch
+        _output_batch->reset();
+    }
+
+    Expr::close(_output_expr_ctxs, state);
+
+    _closed = true;
+    return Status::OK();
+}
+
+void ResultFileSink::set_query_statistics(std::shared_ptr<QueryStatistics> statistics) {
+    if (_is_top_sink) {
+        _sender->set_query_statistics(statistics);
+    } else {
+        _query_statistics = statistics;
+    }
+}
+
+} // namespace doris
diff --git a/be/src/runtime/result_file_sink.h b/be/src/runtime/result_file_sink.h
new file mode 100644
index 0000000..edeecf6
--- /dev/null
+++ b/be/src/runtime/result_file_sink.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_RUNTIME_RESULT_FILE_SINK_H
+#define DORIS_BE_RUNTIME_RESULT_FILE_SINK_H
+
+#include "common/status.h"
+#include "runtime/data_stream_sender.h"
+#include "runtime/descriptors.h"
+#include "gen_cpp/PaloInternalService_types.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "gen_cpp/Types_types.h"
+
+namespace doris {
+
+class RowBatch;
+class ObjectPool;
+class RuntimeState;
+class RuntimeProfile;
+class BufferControlBlock;
+class ExprContext;
+class ResultWriter;
+class MemTracker;
+class ResultFileOptions;
+
+class ResultFileSink : public DataStreamSender {
+public:
+    // construct a buffer for the result need send to coordinator.
+    // row_desc used for convert RowBatch to TRowBatch
+    // buffer_size is the buffer size allocated to each query
+    ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
+               const TResultFileSink& sink);
+    ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>& select_exprs,
+               const TResultFileSink& sink,
+               const std::vector<TPlanFragmentDestination>& destinations,
+               ObjectPool* pool, int sender_id, DescriptorTbl& descs);
+    virtual ~ResultFileSink();
+    virtual Status prepare(RuntimeState* state);
+    virtual Status open(RuntimeState* state);
+    // send data in 'batch' to this backend stream mgr
+    // Blocks until all rows in batch are placed in the buffer
+    virtual Status send(RuntimeState* state, RowBatch* batch);
+    // Flush all buffered data and close all existing channels to destination
+    // hosts. Further send() calls are illegal after calling close().
+    virtual Status close(RuntimeState* state, Status exec_status);
+    virtual RuntimeProfile* profile() { return _profile; }
+
+    void set_query_statistics(std::shared_ptr<QueryStatistics> statistics) override;
+
+private:
+    Status prepare_exprs(RuntimeState* state);
+    // set file options when sink type is FILE
+    std::unique_ptr<ResultFileOptions> _file_opts;
+    TStorageBackendType::type _storage_type;
+
+    // Owned by the RuntimeState.
+    const std::vector<TExpr>& _t_output_expr;
+    std::vector<ExprContext*> _output_expr_ctxs;
+    RowDescriptor _output_row_descriptor;
+
+    boost::shared_ptr<BufferControlBlock> _sender;
+    boost::shared_ptr<ResultWriter> _writer;
+    RowBatch* _output_batch;
+    int _buf_size = 1024;            // Allocated from _pool
+    bool _is_top_sink = true;
+
+};
+
+} // namespace doris
+#endif
diff --git a/be/src/runtime/result_sink.cpp b/be/src/runtime/result_sink.cpp
index e99f5c0..3a04138 100644
--- a/be/src/runtime/result_sink.cpp
+++ b/be/src/runtime/result_sink.cpp
@@ -78,6 +78,7 @@ Status ResultSink::prepare(RuntimeState* state) {
         _writer.reset(new (std::nothrow)
                               MysqlResultWriter(_sender.get(), _output_expr_ctxs, _profile));
         break;
+    // deprecated
     case TResultSinkType::FILE:
         CHECK(_file_opts.get() != nullptr);
         _writer.reset(new (std::nothrow) FileResultWriter(_file_opts.get(), _output_expr_ctxs,
diff --git a/docs/en/administrator-guide/outfile.md b/docs/en/administrator-guide/outfile.md
index 7f31b53..1e9d97c 100644
--- a/docs/en/administrator-guide/outfile.md
+++ b/docs/en/administrator-guide/outfile.md
@@ -30,15 +30,13 @@ This document describes how to use the `SELECT INTO OUTFILE` command to export q
 
 ## Syntax
 
-The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently, it only supports exporting to remote storage such as HDFS, S3, BOS and COS(Tencent Cloud) through the Broker process. The syntax is as follows:
+The `SELECT INTO OUTFILE` statement can export the query results to a file. Currently supports export to remote storage through Broker process, or directly through S3 protocol such as HDFS, S3, BOS and COS(Tencent Cloud) through the Broker process. The syntax is as follows:
 
 ```
 query_stmt
 INTO OUTFILE "file_path"
 [format_as]
-WITH BROKER `broker_name`
-[broker_properties]
-[other_properties]
+[properties]
 ```
 
 * `file_path`
@@ -48,9 +46,9 @@ WITH BROKER `broker_name`
     The final file name will be assembled as `my_file_`, file seq no and the format suffix. File seq no starts from 0, determined by the number of split.
     
     ```
-    my_file_0.csv
-    my_file_1.csv
-    my_file_2.csv
+    my_file_abcdefg_0.csv
+    my_file_abcdefg_1.csv
+    my_file_abcdegf_2.csv
     ```
 
 * `[format_as]`
@@ -63,11 +61,16 @@ WITH BROKER `broker_name`
 
 * `[properties]`
 
-    Specify the relevant attributes. Currently only export through Broker process is supported. Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker](./broker.html).
+    Specify the relevant attributes. Currently it supports exporting through the Broker process, or through the S3 protocol.
+
+    + Broker related attributes need to be prefixed with `broker.`. For details, please refer to [Broker Document](./broker.html).
+    + S3 protocol can directly execute S3 protocol configuration.
 
     ```
     PROPERTIES
     ("broker.prop_key" = "broker.prop_val", ...)
+    or 
+    ("AWS_ENDPOINT" = "xxx", ...)
     ```
 
     Other properties
@@ -83,6 +86,55 @@ WITH BROKER `broker_name`
     * `line_delimiter`: Line delimiter, only applicable to CSV format. The default is `\n`.
     * `max_file_size`: The max size of a single file. Default is 1GB. Range from 5MB to 2GB. Files exceeding this size will be splitted.
 
+## Concurrent export
+
+By default, the export of the query result set is non-concurrent, that is, a single point of export. If the user wants the query result set to be exported concurrently, the following conditions need to be met:
+
+1. session variable 'enable_parallel_outfile' to enable concurrent export: ```set enable_parallel_outfile = true;```
+2. The export method is S3 instead of using a broker
+3. The query can meet the needs of concurrent export, for example, the top level does not contain single point nodes such as sort. (I will give an example later, which is a query that does not export the result set concurrently)
+
+If the above three conditions are met, the concurrent export query result set can be triggered. Concurrency = ```be_instacne_num * parallel_fragment_exec_instance_num```
+
+### How to verify that the result set is exported concurrently
+
+After the user enables concurrent export through the session variable setting, if you want to verify whether the current query can be exported concurrently, you can use the following method.
+
+```
+explain select xxx from xxx where xxx into outfile "s3://xxx" format as csv properties ("AWS_ENDPOINT" = "xxx", ...);
+```
+
+After explaining the query, Doris will return the plan of the query. If you find that ```RESULT FILE SINK``` appears in ```PLAN FRAGMENT 1```, it means that the export concurrency has been opened successfully.
+If ```RESULT FILE SINK``` appears in ```PLAN FRAGMENT 0```, it means that the current query cannot be exported concurrently (the current query does not satisfy the three conditions of concurrent export at the same time).
+
+```
+Planning example for concurrent export:
++-----------------------------------------------------------------------------+
+| Explain String                                                              |
++-----------------------------------------------------------------------------+
+| PLAN FRAGMENT 0                                                             |
+|  OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5>                     |
+|   PARTITION: UNPARTITIONED                                                  |
+|                                                                             |
+|   RESULT SINK                                                               |
+|                                                                             |
+|   1:EXCHANGE                                                                |
+|                                                                             |
+| PLAN FRAGMENT 1                                                             |
+|  OUTPUT EXPRS:`k1` + `k2`                                                   |
+|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1`   |
+|                                                                             |
+|   RESULT FILE SINK                                                          |
+|   FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_                        |
+|   STORAGE TYPE: S3                                                          |
+|                                                                             |
+|   0:OlapScanNode                                                            |
+|      TABLE: multi_tablet                                                    |
++-----------------------------------------------------------------------------+
+```
+
+## Usage example
+
 1. Example 1
 
     Export simple query results to the file `hdfs:/path/to/result.txt`. Specify the export format as CSV. Use `my_broker` and set kerberos authentication information. Specify the column separator as `,` and the line delimiter as `\n`.
@@ -187,6 +239,46 @@ WITH BROKER `broker_name`
     1. Paths that do not exist are automatically created.
     2. These parameters(access.key/secret.key/endpointneed) need to be confirmed with `Tecent Cloud COS`. In particular, the value of endpoint does not need to be filled in bucket_name.
 
+5. Example5
+
+    Use the s3 protocol to export to bos, and concurrent export is enabled.
+
+    ```
+    set enable_parallel_outfile = true;
+    select k1 from tb1 limit 1000
+    into outfile "s3://my_bucket/export/my_file_"
+    format as csv
+    properties
+    (
+        "AWS_ENDPOINT" = "http://s3.bd.bcebos.com",
+        "AWS_ACCESS_KEY" = "xxxx",
+        "AWS_SECRET_KEY" = "xxx",
+        "AWS_REGION" = "bd"
+    )
+    ```
+
+    The final generated file prefix is `my_file_{fragment_instance_id}_`。
+
+6. Example6
+
+    Use the s3 protocol to export to bos, and enable concurrent export of session variables.
+
+    ```
+    set enable_parallel_outfile = true;
+    select k1 from tb1 order by k1 limit 1000
+    into outfile "s3://my_bucket/export/my_file_"
+    format as csv
+    properties
+    (
+        "AWS_ENDPOINT" = "http://s3.bd.bcebos.com",
+        "AWS_ACCESS_KEY" = "xxxx",
+        "AWS_SECRET_KEY" = "xxx",
+        "AWS_REGION" = "bd"
+    )
+    ```
+
+    **But because the query statement has a top-level sorting node, even if the query is enabled for concurrently exported session variables, it cannot be exported concurrently.**
+
 ## Return result
 
 The command is a synchronization command. The command returns, which means the operation is over.
@@ -196,11 +288,11 @@ If it exports and returns normally, the result is as follows:
 
 ```
 mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
-+------------+-----------+----------+--------------+
-| FileNumber | TotalRows | FileSize | URL          |
-+------------+-----------+----------+--------------+
-|          1 |         2 |        8 | 192.168.1.10 |
-+------------+-----------+----------+--------------+
++------------+-----------+----------+--------------------------------------------------------------------+
+| FileNumber | TotalRows | FileSize | URL                                                                |
++------------+-----------+----------+--------------------------------------------------------------------+
+|          1 |         2 |        8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
++------------+-----------+----------+--------------------------------------------------------------------+
 1 row in set (0.05 sec)
 ```
 
@@ -209,6 +301,18 @@ mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"
 * FileSize: The total size of the exported file. Unit byte.
 * URL: If it is exported to a local disk, the Compute Node to which it is exported is displayed here.
 
+If a concurrent export is performed, multiple rows of data will be returned.
+
+```
++------------+-----------+----------+--------------------------------------------------------------------+
+| FileNumber | TotalRows | FileSize | URL                                                                |
++------------+-----------+----------+--------------------------------------------------------------------+
+|          1 |         3 |        7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
+|          1 |         2 |        4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ |
++------------+-----------+----------+--------------------------------------------------------------------+
+2 rows in set (2.218 sec)
+```
+
 If the execution is incorrect, an error message will be returned, such as:
 
 ```
@@ -219,7 +323,7 @@ ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
 ## Notice
 
 * The CSV format does not support exporting binary types, such as BITMAP and HLL types. These types will be output as `\N`, which is null.
-* The query results are exported from a single BE node and a single thread. Therefore, the export time and the export result set size are positively correlated.
+* If you do not enable concurrent export, the query result is exported by a single BE node in a single thread. Therefore, the export time and the export result set size are positively correlated. Turning on concurrent export can reduce the export time.
 * The export command does not check whether the file and file path exist. Whether the path will be automatically created or whether the existing file will be overwritten is entirely determined by the semantics of the remote storage system.
 * If an error occurs during the export process, the exported file may remain on the remote storage system. Doris will not clean these files. The user needs to manually clean up.
 * The timeout of the export command is the same as the timeout of the query. It can be set by `SET query_timeout = xxx`.
diff --git a/docs/zh-CN/administrator-guide/outfile.md b/docs/zh-CN/administrator-guide/outfile.md
index 04ea0e7..f3274a1 100644
--- a/docs/zh-CN/administrator-guide/outfile.md
+++ b/docs/zh-CN/administrator-guide/outfile.md
@@ -30,7 +30,7 @@ under the License.
 
 ## 语法
 
-`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前仅支持通过 Broker 进程导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。语法如下
+`SELECT INTO OUTFILE` 语句可以将查询结果导出到文件中。目前支持通过 Broker 进程, 或直接通过 S3 协议导出到远端存储,如 HDFS,S3,BOS,COS(腾讯云)上。语法如下
 
 ```
 query_stmt
@@ -46,9 +46,9 @@ INTO OUTFILE "file_path"
     最终的文件名将由 `my_file_`,文件序号以及文件格式后缀组成。其中文件序号由0开始,数量为文件被分割的数量。如:
     
     ```
-    my_file_0.csv
-    my_file_1.csv
-    my_file_2.csv
+    my_file_abcdefg_0.csv
+    my_file_abcdefg_1.csv
+    my_file_abcdegf_2.csv
     ```
 
 * `[format_as]`
@@ -62,10 +62,15 @@ INTO OUTFILE "file_path"
 
 * `[properties]`
 
-    指定相关属性。目前仅支持通过 Broker 进程进行导出。Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。
+    指定相关属性。目前支持通过 Broker 进程, 或通过 S3 协议进行导出。
+
+    + Broker 相关属性需加前缀 `broker.`。具体参阅[Broker 文档](./broker.html)。
+    + S3 协议则直接执行 S3 协议配置即可。
 
     ```
     ("broker.prop_key" = "broker.prop_val", ...)
+    or 
+    ("AWS_ENDPOINT" = "xxx", ...)
     ``` 
 
     其他属性:
@@ -80,6 +85,55 @@ INTO OUTFILE "file_path"
     * `line_delimiter`:行分隔符,仅对 CSV 格式适用。默认为 `\n`。
     * `max_file_size`:单个文件的最大大小。默认为 1GB。取值范围在 5MB 到 2GB 之间。超过这个大小的文件将会被切分。
 
+## 并发导出
+
+默认情况下,查询结果集的导出是非并发的,也就是单点导出。如果用户希望查询结果集可以并发导出,需要满足以下条件:
+
+1. session variable 'enable_parallel_outfile' 开启并发导出: ```set enable_parallel_outfile = true;```
+2. 导出方式为 S3 , 而不是使用 broker
+3. 查询可以满足并发导出的需求,比如顶层不包含 sort 等单点节点。(后面会举例说明,哪种属于不可并发导出结果集的查询)
+
+满足以上三个条件,就能触发并发导出查询结果集了。并发度 = ```be_instacne_num * parallel_fragment_exec_instance_num```
+
+### 如何验证结果集被并发导出
+
+用户通过 session 变量设置开启并发导出后,如果想验证当前查询是否能进行并发导出,则可以通过下面这个方法。
+
+```
+explain select xxx from xxx where xxx  into outfile "s3://xxx" format as csv properties ("AWS_ENDPOINT" = "xxx", ...);
+```
+
+对查询进行 explain 后,Doris 会返回该查询的规划,如果你发现 ```RESULT FILE SINK``` 出现在 ```PLAN FRAGMENT 1``` 中,就说明导出并发开启成功了。
+如果 ```RESULT FILE SINK``` 出现在 ```PLAN FRAGMENT 0``` 中,则说明当前查询不能进行并发导出 (当前查询不同时满足并发导出的三个条件)。
+
+```
+并发导出的规划示例:
++-----------------------------------------------------------------------------+
+| Explain String                                                              |
++-----------------------------------------------------------------------------+
+| PLAN FRAGMENT 0                                                             |
+|  OUTPUT EXPRS:<slot 2> | <slot 3> | <slot 4> | <slot 5>                     |
+|   PARTITION: UNPARTITIONED                                                  |
+|                                                                             |
+|   RESULT SINK                                                               |
+|                                                                             |
+|   1:EXCHANGE                                                                |
+|                                                                             |
+| PLAN FRAGMENT 1                                                             |
+|  OUTPUT EXPRS:`k1` + `k2`                                                   |
+|   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`multi_tablet`.`k1`   |
+|                                                                             |
+|   RESULT FILE SINK                                                          |
+|   FILE PATH: s3://ml-bd-repo/bpit_test/outfile_1951_                        |
+|   STORAGE TYPE: S3                                                          |
+|                                                                             |
+|   0:OlapScanNode                                                            |
+|      TABLE: multi_tablet                                                    |
++-----------------------------------------------------------------------------+
+```
+
+## 使用示例
+
 1. 示例1
 
     将简单查询结果导出到文件 `hdfs:/path/to/result.txt`。指定导出格式为 CSV。使用 `my_broker` 并设置 kerberos 认证信息。指定列分隔符为 `,`,行分隔符为 `\n`。
@@ -184,6 +238,46 @@ INTO OUTFILE "file_path"
     1. 不存在的path会自动创建
     2. access.key/secret.key/endpoint需要和cos的同学确认。尤其是endpoint的值,不需要填写bucket_name。
 
+5. 示例5
+
+    使用 s3 协议导出到 bos,并且并发导出开启。
+
+    ```
+    set enable_parallel_outfile = true;
+    select k1 from tb1 limit 1000
+    into outfile "s3://my_bucket/export/my_file_"
+    format as csv
+    properties
+    (
+        "AWS_ENDPOINT" = "http://s3.bd.bcebos.com",
+        "AWS_ACCESS_KEY" = "xxxx",
+        "AWS_SECRET_KEY" = "xxx",
+        "AWS_REGION" = "bd"
+    )
+    ```
+
+    最终生成的文件前缀为 `my_file_{fragment_instance_id}_`。
+
+6. 示例6
+
+    使用 s3 协议导出到 bos,并且并发导出 session 变量开启。
+
+    ```
+    set enable_parallel_outfile = true;
+    select k1 from tb1 order by k1 limit 1000
+    into outfile "s3://my_bucket/export/my_file_"
+    format as csv
+    properties
+    (
+        "AWS_ENDPOINT" = "http://s3.bd.bcebos.com",
+        "AWS_ACCESS_KEY" = "xxxx",
+        "AWS_SECRET_KEY" = "xxx",
+        "AWS_REGION" = "bd"
+    )
+    ```
+
+    **但由于查询语句带了一个顶层的排序节点,所以这个查询即使开启并发导出的 session 变量,也是无法并发导出的。**
+
 ## 返回结果
 
 导出命令为同步命令。命令返回,即表示操作结束。同时会返回一行结果来展示导出的执行结果。
@@ -192,11 +286,11 @@ INTO OUTFILE "file_path"
 
 ```
 mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_";
-+------------+-----------+----------+--------------+
-| FileNumber | TotalRows | FileSize | URL          |
-+------------+-----------+----------+--------------+
-|          1 |         2 |        8 | 192.168.1.10 |
-+------------+-----------+----------+--------------+
++------------+-----------+----------+--------------------------------------------------------------------+
+| FileNumber | TotalRows | FileSize | URL                                                                |
++------------+-----------+----------+--------------------------------------------------------------------+
+|          1 |         2 |        8 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
++------------+-----------+----------+--------------------------------------------------------------------+
 1 row in set (0.05 sec)
 ```
 
@@ -205,6 +299,18 @@ mysql> select * from tbl1 limit 10 into outfile "file:///home/work/path/result_"
 * FileSize:导出文件总大小。单位字节。
 * URL:如果是导出到本地磁盘,则这里显示具体导出到哪个 Compute Node。
 
+如果进行了并发导出,则会返回多行数据。
+
+```
++------------+-----------+----------+--------------------------------------------------------------------+
+| FileNumber | TotalRows | FileSize | URL                                                                |
++------------+-----------+----------+--------------------------------------------------------------------+
+|          1 |         3 |        7 | file:///192.168.1.10/home/work/path/result_{fragment_instance_id}_ |
+|          1 |         2 |        4 | file:///192.168.1.11/home/work/path/result_{fragment_instance_id}_ |
++------------+-----------+----------+--------------------------------------------------------------------+
+2 rows in set (2.218 sec)
+```
+
 如果执行错误,则会返回错误信息,如:
 
 ```
@@ -214,7 +320,7 @@ ERROR 1064 (HY000): errCode = 2, detailMessage = Open broker writer failed ...
 
 ## 注意事项
 
-* 查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。
+* 如果不开启并发导出,查询结果是由单个 BE 节点,单线程导出的。因此导出时间和导出结果集大小正相关。开启并发导出可以降低导出的时间。
 * 导出命令不会检查文件及文件路径是否存在。是否会自动创建路径、或是否会覆盖已存在文件,完全由远端存储系统的语义决定。
 * 如果在导出过程中出现错误,可能会有导出文件残留在远端存储系统上。Doris 不会清理这些文件。需要用户手动清理。
 * 导出命令的超时时间同查询的超时时间。可以通过 `SET query_timeout=xxx` 进行设置。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
index ec9d58a..338a786 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java
@@ -436,13 +436,9 @@ public class CreateRoutineLoadStmt extends DdlStmt {
                 LoadStmt.STRICT_MODE + " should be a boolean");
         execMemLimit = Util.getLongPropertyOrDefault(jobProperties.get(EXEC_MEM_LIMIT_PROPERTY),
                 RoutineLoadJob.DEFAULT_EXEC_MEM_LIMIT, EXEC_MEM_LIMIT_PRED, EXEC_MEM_LIMIT_PROPERTY + "should > 0");
-
-        sendBatchParallelism = 1;
-        if (ConnectContext.get() != null) {
-            sendBatchParallelism = ConnectContext.get().getSessionVariable().getSendBatchParallelism();
-        }
+        
         sendBatchParallelism = ((Long) Util.getLongPropertyOrDefault(jobProperties.get(SEND_BATCH_PARALLELISM),
-                sendBatchParallelism, SEND_BATCH_PARALLELISM_PRED,
+                ConnectContext.get().getSessionVariable().getSendBatchParallelism(), SEND_BATCH_PARALLELISM_PRED,
                 SEND_BATCH_PARALLELISM + " should > 0")).intValue();
 
         if (ConnectContext.get() != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index ef26417..5aecc2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -17,11 +17,13 @@
 
 package org.apache.doris.analysis;
 
+import org.apache.doris.backup.S3Storage;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeNameFormat;
+import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.thrift.TFileFormatType;
@@ -33,6 +35,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -80,6 +83,7 @@ public class OutFileClause {
     }
 
     public static final String LOCAL_FILE_PREFIX = "file:///";
+    private static final String S3_FILE_PREFIX = "S3://";
     private static final String BROKER_PROP_PREFIX = "broker.";
     private static final String PROP_BROKER_NAME = "broker.name";
     private static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -147,12 +151,9 @@ public class OutFileClause {
         return schema;
     }
 
-    private void analyze(Analyzer analyzer) throws AnalysisException {
+    private void analyze(Analyzer analyzer) throws UserException {
         analyzeFilePath();
 
-        if (Strings.isNullOrEmpty(filePath)) {
-            throw new AnalysisException("Must specify file in OUTFILE clause");
-        }
         switch (this.format) {
             case "csv":
                 fileFormatType = TFileFormatType.FORMAT_CSV_PLAIN;
@@ -173,7 +174,7 @@ public class OutFileClause {
         }
     }
 
-    public void analyze(Analyzer analyzer, SelectStmt stmt) throws AnalysisException {
+    public void analyze(Analyzer analyzer, SelectStmt stmt) throws UserException {
         analyze(analyzer);
 
         if (isParquetFormat()) {
@@ -299,15 +300,19 @@ public class OutFileClause {
         } else {
             isLocalOutput = false;
         }
+
+        if (Strings.isNullOrEmpty(filePath)) {
+            throw new AnalysisException("Must specify file in OUTFILE clause");
+        }
     }
 
-    private void analyzeProperties() throws AnalysisException {
+    private void analyzeProperties() throws UserException {
         if (properties == null || properties.isEmpty()) {
             return;
         }
 
         Set<String> processedPropKeys = Sets.newHashSet();
-        getBrokerProperties(processedPropKeys);
+        analyzeBrokerDesc(processedPropKeys);
 
         if (properties.containsKey(PROP_COLUMN_SEPARATOR)) {
             if (!isCsvFormat()) {
@@ -350,12 +355,23 @@ public class OutFileClause {
         }
     }
 
-    private void getBrokerProperties(Set<String> processedPropKeys) {
-        if (!properties.containsKey(PROP_BROKER_NAME)) {
+    /**
+     * The following two situations will generate the corresponding @brokerDesc:
+     * 1. broker: with broker name
+     * 2. s3: with s3 pattern path, without broker name
+     */
+    private void analyzeBrokerDesc(Set<String> processedPropKeys) throws UserException {
+        String brokerName = properties.get(PROP_BROKER_NAME);
+        StorageBackend.StorageType storageType;
+        if (properties.containsKey(PROP_BROKER_NAME)) {
+            processedPropKeys.add(PROP_BROKER_NAME);
+            storageType = StorageBackend.StorageType.BROKER;
+        } else if (filePath.toUpperCase().startsWith(S3_FILE_PREFIX)) {
+            brokerName = StorageBackend.StorageType.S3.name();
+            storageType = StorageBackend.StorageType.S3;
+        } else {
             return;
         }
-        String brokerName = properties.get(PROP_BROKER_NAME);
-        processedPropKeys.add(PROP_BROKER_NAME);
 
         Map<String, String> brokerProps = Maps.newHashMap();
         Iterator<Map.Entry<String, String>> iter = properties.entrySet().iterator();
@@ -364,10 +380,16 @@ public class OutFileClause {
             if (entry.getKey().startsWith(BROKER_PROP_PREFIX) && !entry.getKey().equals(PROP_BROKER_NAME)) {
                 brokerProps.put(entry.getKey().substring(BROKER_PROP_PREFIX.length()), entry.getValue());
                 processedPropKeys.add(entry.getKey());
+            } else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
+                brokerProps.put(entry.getKey(), entry.getValue());
+                processedPropKeys.add(entry.getKey());
             }
         }
 
-        brokerDesc = new BrokerDesc(brokerName, brokerProps);
+        brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
+        if (storageType == StorageBackend.StorageType.S3) {
+            S3Storage.checkS3(new CaseInsensitiveMap(brokerProps));
+        }
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
index 471e306..0f2167e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/SelectStmt.java
@@ -379,7 +379,7 @@ public class SelectStmt extends QueryStmt {
         this.tableAliasGenerator = tableAliasGenerator;
     }
 
-    public void analyze(Analyzer analyzer) throws AnalysisException, UserException {
+    public void analyze(Analyzer analyzer) throws UserException {
         if (isAnalyzed()) {
             return;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
index e0bb077..2032229 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/S3Storage.java
@@ -66,6 +66,7 @@ import software.amazon.awssdk.services.s3.model.PutObjectResponse;
 import software.amazon.awssdk.services.s3.model.S3Exception;
 
 public class S3Storage extends BlobStorage {
+    public static final String S3_PROPERTIES_PREFIX = "AWS";
     public static final String S3_AK = "AWS_ACCESS_KEY";
     public static final String S3_SK = "AWS_SECRET_KEY";
     public static final String S3_ENDPOINT = "AWS_ENDPOINT";
@@ -105,7 +106,7 @@ public class S3Storage extends BlobStorage {
 
     }
 
-    private void checkS3() throws UserException {
+    public static void checkS3(CaseInsensitiveMap caseInsensitiveProperties) throws UserException {
         if (!caseInsensitiveProperties.containsKey(S3_REGION)) {
             throw new UserException("AWS_REGION not found.");
         }
@@ -122,7 +123,7 @@ public class S3Storage extends BlobStorage {
 
     private S3Client getClient(String bucket) throws UserException {
         if (client == null) {
-            checkS3();
+            checkS3(caseInsensitiveProperties);
             URI tmpEndpoint = URI.create(caseInsensitiveProperties.get(S3_ENDPOINT).toString());
             AwsBasicCredentials awsBasic = AwsBasicCredentials.create(
                     caseInsensitiveProperties.get(S3_AK).toString(),
@@ -307,7 +308,7 @@ public class S3Storage extends BlobStorage {
     // broker file pattern glob is too complex, so we use hadoop directly
     public Status list(String remotePath, List<RemoteFile> result, boolean fileNameOnly) {
         try {
-            checkS3();
+            checkS3(caseInsensitiveProperties);
             Configuration conf = new Configuration();
             String s3AK = caseInsensitiveProperties.get(S3_AK).toString();
             String s3Sk = caseInsensitiveProperties.get(S3_SK).toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
index fde11b9..f09c816 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java
@@ -407,9 +407,6 @@ public class ExportJob implements Writable {
                         new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.RANDOM);
                 break;
             case ODBC:
-                fragment = new PlanFragment(
-                        new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.UNPARTITIONED);
-                break;
             case MYSQL:
                 fragment = new PlanFragment(
                         new PlanFragmentId(nextId.getAndIncrement()), scanNode, DataPartition.UNPARTITIONED);
@@ -422,7 +419,7 @@ public class ExportJob implements Writable {
         scanNode.setFragmentId(fragment.getFragmentId());
         fragment.setSink(exportSink);
         try {
-            fragment.finalize(analyzer, false);
+            fragment.finalize(null);
         } catch (Exception e) {
             LOG.info("Fragment finalize failed. e= {}", e);
             throw new UserException("Fragment finalize failed");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index a714475..e8928dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -28,7 +28,6 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.OlapTable;
 import org.apache.doris.common.LoadException;
 import org.apache.doris.common.MetaNotFoundException;
-import org.apache.doris.common.NotImplementedException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.load.BrokerFileGroup;
@@ -145,12 +144,7 @@ public class LoadingTaskPlanner {
 
         // 4. finalize
         for (PlanFragment fragment : fragments) {
-            try {
-                fragment.finalize(analyzer, false);
-            } catch (NotImplementedException e) {
-                LOG.info("Fragment finalize failed.{}", e.getMessage());
-                throw new UserException("Fragment finalize failed.");
-            }
+            fragment.finalize(null);
         }
         Collections.reverse(fragments);
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index 6fed2cd..a686d26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -98,7 +98,7 @@ public class UpdatePlanner extends Planner {
                 DataPartition.RANDOM);
         planFragment.setSink(olapTableSink);
         planFragment.setOutputExprs(computeOutputExprs());
-        planFragment.finalize(analyzer, false);
+        planFragment.finalize(null);
         fragments.add(planFragment);
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 91b23b7..f03527a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -78,6 +78,13 @@ public class ExchangeNode extends PlanNode {
         computeTupleIds();
     }
 
+    public boolean isMergingExchange() {
+        if (mergeInfo != null) {
+            return true;
+        }
+        return false;
+    }
+
     @Override
     public final void computeTupleIds() {
         clearTupleIds();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index e865cf7..af2a7f0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -17,16 +17,16 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
-import org.apache.doris.common.NotImplementedException;
+import org.apache.doris.analysis.QueryStmt;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.TreeNode;
-import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPlanFragment;
-import org.apache.doris.thrift.TResultSinkType;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -186,6 +186,18 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         this.outputExprs = Expr.cloneList(outputExprs, null);
     }
 
+    public void resetOutputExprs(TupleDescriptor tupleDescriptor) {
+        this.outputExprs = Lists.newArrayList();
+        for (SlotDescriptor slotDescriptor : tupleDescriptor.getSlots()) {
+            SlotRef slotRef = new SlotRef(slotDescriptor);
+            outputExprs.add(slotRef);
+        }
+    }
+
+    public ArrayList<Expr> getOutputExprs() {
+        return outputExprs;
+    }
+
     public void setBuilderRuntimeFilterIds(RuntimeFilterId rid) {
         this.builderRuntimeFilterIds.add(rid);
     }
@@ -197,8 +209,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     /**
      * Finalize plan tree and create stream sink, if needed.
      */
-    public void finalize(Analyzer analyzer, boolean validateFileFormats)
-            throws UserException, NotImplementedException {
+    public void finalize(QueryStmt queryStmt) {
         if (sink != null) {
             return;
         }
@@ -215,11 +226,14 @@ public class PlanFragment extends TreeNode<PlanFragment> {
                 // "select 1 + 2"
                 return;
             }
-            // add ResultSink
             Preconditions.checkState(sink == null);
-            // we're streaming to an result sink
-            ResultSink bufferSink = new ResultSink(planRoot.getId(), TResultSinkType.MYSQL_PROTOCAL);
-            sink = bufferSink;
+            if (queryStmt != null && queryStmt.hasOutFileClause()) {
+                sink = new ResultFileSink(planRoot.getId(), queryStmt.getOutFileClause());
+            } else {
+                // add ResultSink
+                // we're streaming to an result sink
+                sink = new ResultSink(planRoot.getId());
+            }
         }
     }
 
@@ -357,6 +371,11 @@ public class PlanFragment extends TreeNode<PlanFragment> {
         this.sink = sink;
     }
 
+    public void resetSink(DataSink sink) {
+        sink.setFragment(this);
+        this.sink = sink;
+    }
+
     public PlanFragmentId getFragmentId() {
         return fragmentId;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 8b30ad4..c2503c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -275,6 +275,10 @@ abstract public class PlanNode extends TreeNode<PlanNode> {
         return tupleIds;
     }
 
+    public void resetTupleIds(ArrayList<TupleId> tupleIds) {
+        this.tupleIds = tupleIds;
+    }
+
     public ArrayList<TupleId> getTupleIds() {
         Preconditions.checkState(tupleIds != null);
         return tupleIds;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
index 45f0ada..00454b0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java
@@ -26,21 +26,23 @@ import org.apache.doris.analysis.SelectStmt;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.StatementBase;
+import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.common.util.VectorizedUtil;
+import org.apache.doris.catalog.ScalarType;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.profile.PlanTreeBuilder;
 import org.apache.doris.common.profile.PlanTreePrinter;
+import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.rewrite.mvrewrite.MVSelectFailedException;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TQueryOptions;
+import org.apache.doris.thrift.TRuntimeFilterMode;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
-import org.apache.doris.thrift.TRuntimeFilterMode;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -242,14 +244,14 @@ public class Planner {
                     rootFragment.getPlanRoot().getOutputSmap(), analyzer, false);
             rootFragment.setOutputExprs(resExprs);
         }
-        // rootFragment.setOutputExprs(exprs);
         LOG.debug("finalize plan fragments");
         for (PlanFragment fragment : fragments) {
-            fragment.finalize(analyzer, !queryOptions.allow_unsupported_formats);
+            fragment.finalize(queryStmt);
         }
+
         Collections.reverse(fragments);
 
-        setOutfileSink(queryStmt);
+        pushDownResultFileSink(analyzer);
 
         if (queryStmt instanceof SelectStmt) {
             SelectStmt selectStmt = (SelectStmt) queryStmt;
@@ -263,21 +265,6 @@ public class Planner {
         }
     }
 
-    // if query stmt has OUTFILE clause, set info into ResultSink.
-    // this should be done after fragments are generated.
-    private void setOutfileSink(QueryStmt queryStmt) {
-        if (!queryStmt.hasOutFileClause()) {
-            return;
-        }
-        PlanFragment topFragment = fragments.get(0);
-        if (!(topFragment.getSink() instanceof ResultSink)) {
-            return;
-        }
-
-        ResultSink resultSink = (ResultSink) topFragment.getSink();
-        resultSink.setOutfileInfo(queryStmt.getOutFileClause());
-    }
-
     /**
      * If there are unassigned conjuncts, returns a SelectNode on top of root that evaluate those conjuncts; otherwise
      * returns root unchanged.
@@ -298,9 +285,96 @@ public class Planner {
         return selectNode;
     }
 
+    /**
+     * This function is mainly used to try to push the top-level result file sink down one layer.
+     * The result file sink after the pushdown can realize the function of concurrently exporting the result set.
+     * Push down needs to meet the following conditions:
+     * 1. The query enables the session variable of the concurrent export result set
+     * 2. The top-level fragment is not a merge change node
+     * 3. The export method uses the s3 method
+     *
+     * After satisfying the above three conditions,
+     * the result file sink and the associated output expr will be pushed down to the next layer.
+     * The second plan fragment performs expression calculation and derives the result set.
+     * The top plan fragment will only summarize the status of the exported result set and return it to fe.
+     */
+    private void pushDownResultFileSink(Analyzer analyzer) {
+        if (fragments.size() < 1) {
+            return;
+        }
+        if (!(fragments.get(0).getSink() instanceof ResultFileSink)) {
+            return;
+        }
+        if (!ConnectContext.get().getSessionVariable().isEnableParallelOutfile()) {
+            return;
+        }
+        if (!(fragments.get(0).getPlanRoot() instanceof ExchangeNode)) {
+            return;
+        }
+        PlanFragment topPlanFragment = fragments.get(0);
+        ExchangeNode topPlanNode = (ExchangeNode) topPlanFragment.getPlanRoot();
+        // try to push down result file sink
+        if (topPlanNode.isMergingExchange()) {
+            return;
+        }
+        PlanFragment secondPlanFragment = fragments.get(1);
+        ResultFileSink resultFileSink = (ResultFileSink) topPlanFragment.getSink();
+        if (resultFileSink.getStorageType() == StorageBackend.StorageType.BROKER) {
+            return;
+        }
+        if (secondPlanFragment.getOutputExprs() != null) {
+            return;
+        }
+        // create result file sink desc
+        TupleDescriptor fileStatusDesc = constructFileStatusTupleDesc(analyzer);
+        resultFileSink.resetByDataStreamSink((DataStreamSink) secondPlanFragment.getSink());
+        resultFileSink.setOutputTupleId(fileStatusDesc.getId());
+        secondPlanFragment.setOutputExprs(topPlanFragment.getOutputExprs());
+        secondPlanFragment.resetSink(resultFileSink);
+        ResultSink resultSink = new ResultSink(topPlanNode.getId());
+        topPlanFragment.resetSink(resultSink);
+        topPlanFragment.resetOutputExprs(fileStatusDesc);
+        topPlanFragment.getPlanRoot().resetTupleIds(Lists.newArrayList(fileStatusDesc.getId()));
+    }
+
+    /**
+     * Construct a tuple for file status, the tuple schema as following:
+     * | FileNumber | Int     |
+     * | TotalRows  | Bigint  |
+     * | FileSize   | Bigint  |
+     * | URL        | Varchar |
+     */
+    private TupleDescriptor constructFileStatusTupleDesc(Analyzer analyzer) {
+        TupleDescriptor resultFileStatusTupleDesc =
+                analyzer.getDescTbl().createTupleDescriptor("result_file_status");
+        resultFileStatusTupleDesc.setIsMaterialized(true);
+        SlotDescriptor fileNumber = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
+        fileNumber.setLabel("FileNumber");
+        fileNumber.setType(ScalarType.createType(PrimitiveType.INT));
+        fileNumber.setIsMaterialized(true);
+        fileNumber.setIsNullable(false);
+        SlotDescriptor totalRows = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
+        totalRows.setLabel("TotalRows");
+        totalRows.setType(ScalarType.createType(PrimitiveType.BIGINT));
+        totalRows.setIsMaterialized(true);
+        totalRows.setIsNullable(false);
+        SlotDescriptor fileSize = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
+        fileSize.setLabel("FileSize");
+        fileSize.setType(ScalarType.createType(PrimitiveType.BIGINT));
+        fileSize.setIsMaterialized(true);
+        fileSize.setIsNullable(false);
+        SlotDescriptor url = analyzer.getDescTbl().addSlotDescriptor(resultFileStatusTupleDesc);
+        url.setLabel("URL");
+        url.setType(ScalarType.createType(PrimitiveType.VARCHAR));
+        url.setIsMaterialized(true);
+        url.setIsNullable(false);
+        resultFileStatusTupleDesc.computeStatAndMemLayout();
+        return resultFileStatusTupleDesc;
+    }
+
     private static class QueryStatisticsTransferOptimizer {
         private final PlanFragment root;
-        
+
         public QueryStatisticsTransferOptimizer(PlanFragment root) {
             Preconditions.checkNotNull(root);
             this.root = root;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
similarity index 50%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
index ea91583..1db7263 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultFileSink.java
@@ -18,55 +18,85 @@
 package org.apache.doris.planner;
 
 import org.apache.doris.analysis.OutFileClause;
-import org.apache.doris.common.util.VectorizedUtil;
+import org.apache.doris.analysis.StorageBackend;
+import org.apache.doris.analysis.TupleId;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TResultFileSink;
 import org.apache.doris.thrift.TResultFileSinkOptions;
-import org.apache.doris.thrift.TResultSink;
-import org.apache.doris.thrift.TResultSinkType;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
-/**
- * Result sink that forwards data to
- * 1. the FE data receiver, which result the final query result to user client. Or,
- * 2. files that save the result data
- */
-public class ResultSink extends DataSink {
-    private final PlanNodeId exchNodeId;
-    private TResultSinkType sinkType;
-    private String brokerName;
+public class ResultFileSink extends DataSink {
+    private PlanNodeId exchNodeId;
     private TResultFileSinkOptions fileSinkOptions;
+    private String brokerName;
+    private StorageBackend.StorageType storageType;
+    private DataPartition outputPartition;
+    private TupleId outputTupleId;
 
-    public ResultSink(PlanNodeId exchNodeId, TResultSinkType sinkType) {
+    public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause) {
         this.exchNodeId = exchNodeId;
-        this.sinkType = sinkType;
+        this.fileSinkOptions = outFileClause.toSinkOptions();
+        this.brokerName = outFileClause.getBrokerDesc() == null ? null :
+                outFileClause.getBrokerDesc().getName();
+        this.storageType = outFileClause.getBrokerDesc() == null ? StorageBackend.StorageType.LOCAL :
+                outFileClause.getBrokerDesc().getStorageType();
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public StorageBackend.StorageType getStorageType() {
+        return storageType;
+    }
+
+    public void setBrokerAddr(String ip, int port) {
+        Preconditions.checkNotNull(fileSinkOptions);
+        fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
+    }
+
+    public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
+        exchNodeId = dataStreamSink.getExchNodeId();
+        outputPartition = dataStreamSink.getOutputPartition();
+    }
+
+    public void setOutputTupleId(TupleId tupleId) {
+        outputTupleId = tupleId;
     }
 
     @Override
     public String getExplainString(String prefix, TExplainLevel explainLevel) {
         StringBuilder strBuilder = new StringBuilder();
         strBuilder.append(prefix);
-        if (VectorizedUtil.isVectorized()) {
-            strBuilder.append("V");
+        strBuilder.append("RESULT FILE SINK\n");
+        strBuilder.append("  FILE PATH: " + fileSinkOptions.getFilePath() + "\n");
+        strBuilder.append("  STORAGE TYPE: " + storageType.name() + "\n");
+        switch (storageType) {
+            case BROKER:
+                strBuilder.append("  broker name: " + brokerName + "\n");
+                break;
+            default:
+                break;
         }
-        strBuilder.append("RESULT SINK\n");
         return strBuilder.toString();
     }
 
     @Override
     protected TDataSink toThrift() {
-        TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
-        TResultSink tResultSink = new TResultSink();
-        tResultSink.setType(sinkType);
-        if (fileSinkOptions != null) {
-            tResultSink.setFileOptions(fileSinkOptions);
+        TDataSink result = new TDataSink(TDataSinkType.RESULT_FILE_SINK);
+        TResultFileSink tResultFileSink = new TResultFileSink();
+        tResultFileSink.setFileOptions(fileSinkOptions);
+        tResultFileSink.setStorageBackendType(storageType.toThrift());
+        tResultFileSink.setDestNodeId(exchNodeId.asInt());
+        if (outputTupleId != null) {
+            tResultFileSink.setOutputTupleId(outputTupleId.asInt());
         }
-        result.setResultSink(tResultSink);
+        result.setResultFileSink(tResultFileSink);
         return result;
     }
 
@@ -77,29 +107,6 @@ public class ResultSink extends DataSink {
 
     @Override
     public DataPartition getOutputPartition() {
-        return null;
-    }
-
-    public boolean isOutputFileSink() {
-        return sinkType == TResultSinkType.FILE;
-    }
-
-    public boolean needBroker() {
-        return !Strings.isNullOrEmpty(brokerName);
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public void setOutfileInfo(OutFileClause outFileClause) {
-        sinkType = TResultSinkType.FILE;
-        fileSinkOptions = outFileClause.toSinkOptions();
-        brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName();
-    }
-
-    public void setBrokerAddr(String ip, int port) {
-        Preconditions.checkNotNull(fileSinkOptions);
-        fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
+        return outputPartition;
     }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
index ea91583..c940851 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ResultSink.java
@@ -17,19 +17,11 @@
 
 package org.apache.doris.planner;
 
-import org.apache.doris.analysis.OutFileClause;
 import org.apache.doris.common.util.VectorizedUtil;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TExplainLevel;
-import org.apache.doris.thrift.TNetworkAddress;
-import org.apache.doris.thrift.TResultFileSinkOptions;
 import org.apache.doris.thrift.TResultSink;
-import org.apache.doris.thrift.TResultSinkType;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
 
 /**
  * Result sink that forwards data to
@@ -38,13 +30,9 @@ import com.google.common.collect.Lists;
  */
 public class ResultSink extends DataSink {
     private final PlanNodeId exchNodeId;
-    private TResultSinkType sinkType;
-    private String brokerName;
-    private TResultFileSinkOptions fileSinkOptions;
 
-    public ResultSink(PlanNodeId exchNodeId, TResultSinkType sinkType) {
+    public ResultSink(PlanNodeId exchNodeId) {
         this.exchNodeId = exchNodeId;
-        this.sinkType = sinkType;
     }
 
     @Override
@@ -62,10 +50,6 @@ public class ResultSink extends DataSink {
     protected TDataSink toThrift() {
         TDataSink result = new TDataSink(TDataSinkType.RESULT_SINK);
         TResultSink tResultSink = new TResultSink();
-        tResultSink.setType(sinkType);
-        if (fileSinkOptions != null) {
-            tResultSink.setFileOptions(fileSinkOptions);
-        }
         result.setResultSink(tResultSink);
         return result;
     }
@@ -79,27 +63,4 @@ public class ResultSink extends DataSink {
     public DataPartition getOutputPartition() {
         return null;
     }
-
-    public boolean isOutputFileSink() {
-        return sinkType == TResultSinkType.FILE;
-    }
-
-    public boolean needBroker() {
-        return !Strings.isNullOrEmpty(brokerName);
-    }
-
-    public String getBrokerName() {
-        return brokerName;
-    }
-
-    public void setOutfileInfo(OutFileClause outFileClause) {
-        sinkType = TResultSinkType.FILE;
-        fileSinkOptions = outFileClause.toSinkOptions();
-        brokerName = outFileClause.getBrokerDesc() == null ? null : outFileClause.getBrokerDesc().getName();
-    }
-
-    public void setBrokerAddr(String ip, int port) {
-        Preconditions.checkNotNull(fileSinkOptions);
-        fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
index fdcfa8b..5bd4cf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java
@@ -159,7 +159,7 @@ public class StreamLoadPlanner {
         PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), scanNode, DataPartition.UNPARTITIONED);
         fragment.setSink(olapTableSink);
 
-        fragment.finalize(null, false);
+        fragment.finalize(null);
 
         TExecPlanFragmentParams params = new TExecPlanFragmentParams();
         params.setProtocolVersion(PaloInternalServiceVersion.V1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 5073a8a..b771f43 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -19,6 +19,7 @@ package org.apache.doris.qe;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.Config;
@@ -47,6 +48,7 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
+import org.apache.doris.planner.ResultFileSink;
 import org.apache.doris.planner.ResultSink;
 import org.apache.doris.planner.RuntimeFilter;
 import org.apache.doris.planner.RuntimeFilterId;
@@ -446,7 +448,8 @@ public class Coordinator {
         // create result receiver
         PlanFragmentId topId = fragments.get(0).getFragmentId();
         FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
-        if (topParams.fragment.getSink() instanceof ResultSink) {
+        DataSink topDataSink = topParams.fragment.getSink();
+        if (topDataSink instanceof ResultSink || topDataSink instanceof ResultFileSink) {
             TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
             receiver = new ResultReceiver(
                     topParams.instanceExecParams.get(0).instanceId,
@@ -456,16 +459,15 @@ public class Coordinator {
 
             LOG.info("dispatch query job: {} to {}", DebugUtil.printId(queryId), topParams.instanceExecParams.get(0).host);
 
-            // set the broker address for OUTFILE sink
-            ResultSink resultSink = (ResultSink) topParams.fragment.getSink();
-            if (resultSink.isOutputFileSink() && resultSink.needBroker()) {
-                FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr().getBroker(resultSink.getBrokerName(),
-                        execBeAddr.getHostname());
-                resultSink.setBrokerAddr(broker.ip, broker.port);
-                LOG.info("OUTFILE through broker: {}:{}", broker.ip, broker.port);
+            if (topDataSink instanceof ResultFileSink
+                    && ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
+                // set the broker address for OUTFILE sink
+                ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
+                    FsBroker broker = Catalog.getCurrentCatalog().getBrokerMgr()
+                            .getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
+                topResultFileSink.setBrokerAddr(broker.ip, broker.port);
             }
-
-        } else {
+        }  else {
             // This is a load process.
             this.queryOptions.setIsReportSuccess(true);
             this.queryOptions.setEnableVectorizedEngine(false);
@@ -863,16 +865,17 @@ public class Coordinator {
                         params.instanceExecParams.size() + destParams.perExchNumSenders.get(exchId.asInt()));
             }
 
-            if (sink.getOutputPartition().isBucketShuffleHashPartition()) {
+            if (sink.getOutputPartition() != null
+                    && sink.getOutputPartition().isBucketShuffleHashPartition()) {
                 // the destFragment must be bucket shuffle
                 Preconditions.checkState(bucketShuffleJoinController.
                         isBucketShuffleJoin(destFragment.getFragmentId().asInt()), "Sink is" +
-                        "Bucket Shffulle Partition, The destFragment must have bucket shuffle join node ");
+                        "Bucket Shuffle Partition, The destFragment must have bucket shuffle join node ");
 
                 int bucketSeq = 0;
                 int bucketNum = bucketShuffleJoinController.getFragmentBucketNum(destFragment.getFragmentId());
                 TNetworkAddress dummyServer = new TNetworkAddress("0.0.0.0", 0);
-               
+
                 // when left table is empty, it's bucketset is empty.
                 // set right table destination address to the address of left table
                 if (destParams.instanceExecParams.size() == 1 && destParams.instanceExecParams.get(0).bucketSeqSet.isEmpty()) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 25d2778..4e9e0e9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -28,7 +28,6 @@ import org.apache.doris.thrift.TResourceLimit;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
-import org.json.JSONObject;
 
 import java.io.DataInput;
 import java.io.DataOutput;
@@ -38,6 +37,8 @@ import java.lang.reflect.Field;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.json.JSONObject;
+
 // System variable
 public class SessionVariable implements Serializable, Writable {
     static final Logger LOG = LogManager.getLogger(StmtExecutor.class);
@@ -156,6 +157,8 @@ public class SessionVariable implements Serializable, Writable {
     public static final String ENABLE_VECTORIZED_ENGINE = "enable_vectorized_engine";
 
     public static final String CPU_RESOURCE_LIMIT = "cpu_resource_limit";
+    
+    public static final String ENABLE_PARALLEL_OUTFILE = "enable_parallel_outfile";
 
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, String>();
@@ -368,9 +371,10 @@ public class SessionVariable implements Serializable, Writable {
     private int runtimeFilterType = 1;
     @VariableMgr.VarAttr(name = RUNTIME_FILTER_MAX_IN_NUM)
     private int runtimeFilterMaxInNum = 1024;
-
     @VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE)
     public boolean enableVectorizedEngine = false;
+    @VariableMgr.VarAttr(name = ENABLE_PARALLEL_OUTFILE)
+    public boolean enableParallelOutfile = false;
 
     @VariableMgr.VarAttr(name = CPU_RESOURCE_LIMIT)
     public int cpuResourceLimit = -1;
@@ -766,7 +770,7 @@ public class SessionVariable implements Serializable, Writable {
     public boolean isDeleteWithoutPartition() {
         return deleteWithoutPartition;
     }
-    
+
     public boolean isExtractWideRangeExpr() {
         return extractWideRangeExpr;
     }
@@ -779,6 +783,10 @@ public class SessionVariable implements Serializable, Writable {
         return sendBatchParallelism;
     }
 
+    public boolean isEnableParallelOutfile() {
+        return enableParallelOutfile;
+    }
+
     // Serialize to thrift object
     // used for rest api
     public TQueryOptions toThrift() {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
index bce2e07..8b55b5f 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/CreateRoutineLoadStmtTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.load.loadv2.LoadTask;
 import org.apache.doris.load.routineload.LoadDataSourceType;
 import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -138,7 +139,8 @@ public class CreateRoutineLoadStmtTest {
     }
 
     @Test
-    public void testAnalyze(@Injectable Analyzer analyzer) throws UserException {
+    public void testAnalyze(@Injectable Analyzer analyzer,
+                            @Injectable SessionVariable sessionVariable) throws UserException {
         String jobName = "job1";
         String dbName = "db1";
         LabelName labelName = new LabelName(dbName, jobName);
@@ -178,6 +180,15 @@ public class CreateRoutineLoadStmtTest {
             }
         };
 
+        new Expectations(){
+            {
+                ctx.getSessionVariable();
+                result = sessionVariable;
+                sessionVariable.getSendBatchParallelism();
+                result = 1;
+            }
+        };
+
         createRoutineLoadStmt.analyze(analyzer);
 
         Assert.assertNotNull(createRoutineLoadStmt.getRoutineLoadDesc());
diff --git a/gensrc/thrift/DataSinks.thrift b/gensrc/thrift/DataSinks.thrift
index b8fe8b2..3556089 100644
--- a/gensrc/thrift/DataSinks.thrift
+++ b/gensrc/thrift/DataSinks.thrift
@@ -33,6 +33,7 @@ enum TDataSinkType {
     OLAP_TABLE_SINK,
     MEMORY_SCRATCH_SINK,
     ODBC_TABLE_SINK,
+    RESULT_FILE_SINK,
     VRESULT_SINK,
     VDATA_STREAM_SINK,
 }
@@ -79,6 +80,13 @@ struct TResultSink {
     2: optional TResultFileSinkOptions file_options
 }
 
+struct TResultFileSink {
+    1: optional TResultFileSinkOptions file_options;
+    2: optional Types.TStorageBackendType storage_backend_type;
+    3: optional Types.TPlanNodeId dest_node_id;
+    4: optional Types.TTupleId output_tuple_id;
+}
+
 struct TMysqlTableSink {
     1: required string host
     2: required i32 port
@@ -147,5 +155,6 @@ struct TDataSink {
   7: optional TOlapTableSink olap_table_sink
   8: optional TMemoryScratchSink memory_scratch_sink
   9: optional TOdbcTableSink odbc_table_sink
+  10: optional TResultFileSink result_file_sink
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org