You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/13 12:41:54 UTC

[incubator-doris] branch master updated: [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d58e00c49c [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)
d58e00c49c is described below

commit d58e00c49c6424a90209113f40d633d784003829
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Mon Jun 13 20:41:48 2022 +0800

    [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)
    
    When the length of `Tuple/Block data` is greater than 2G, serialize the protoBuf request and embed the
    `Tuple/Block data` into the controller attachment and transmit it through http brpc.
    
    This is to avoid errors when the length of the protoBuf request exceeds 2G:
    `Bad request, error_text=[E1003]Fail to compress request`.
    
    In #7164, `Tuple/Block data` was put into attachment and sent via default `baidu_std brpc`,
    but when the attachment exceeds 2G, it will be truncated. There is no 2G limit for sending via `http brpc`.
    
    Also, in #7921, consider putting `Tuple/Block data` into attachment transport by default, as this theoretically
    reduces one serialization and improves performance. However, the test found that the performance did not improve,
    but the memory peak increased due to the addition of a memory copy.
---
 be/src/common/config.h                           |   8 +-
 be/src/exec/tablet_sink.cpp                      |  39 +++--
 be/src/exec/tablet_sink.h                        |  15 +-
 be/src/runtime/data_stream_sender.cpp            |  48 +++---
 be/src/runtime/data_stream_sender.h              |  15 +-
 be/src/runtime/row_batch.cpp                     |  33 ++---
 be/src/runtime/row_batch.h                       |   4 +-
 be/src/service/internal_service.cpp              | 181 ++++++++++++++++++++---
 be/src/service/internal_service.h                |  39 +++++
 be/src/util/brpc_client_cache.h                  |  20 ++-
 be/src/util/proto_util.h                         | 110 +++++++++++++-
 be/src/vec/core/block.cpp                        |  21 +--
 be/src/vec/core/block.h                          |   2 +-
 be/src/vec/sink/vdata_stream_sender.cpp          |  37 ++++-
 be/src/vec/sink/vdata_stream_sender.h            |   9 +-
 be/src/vec/sink/vtablet_sink.cpp                 |  33 ++++-
 be/src/vec/sink/vtablet_sink.h                   |   5 -
 be/test/vec/core/block_test.cpp                  |   7 +-
 docs/en/docs/admin-manual/config/be-config.md    |   6 +-
 docs/zh-CN/docs/admin-manual/config/be-config.md |   6 +-
 gensrc/proto/internal_service.proto              |   7 +
 21 files changed, 496 insertions(+), 149 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index fd37ca0b48..6fd53e69d5 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -562,9 +562,11 @@ CONF_String(default_rowset_type, "BETA");
 CONF_Int64(brpc_max_body_size, "3147483648");
 // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED
 CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
-// Whether to transfer RowBatch in ProtoBuf Request to Controller Attachment and send it
-// through brpc, this will be faster and avoid the error of Request length overflow.
-CONF_mBool(transfer_data_by_brpc_attachment, "false");
+// TODO(zxy): expect to be true in v1.3
+// Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into
+// Controller Attachment and send it through http brpc when the length of the Tuple/Block data
+// is greater than 1.8G. This is to avoid the error of Request length overflow (2G).
+CONF_mBool(transfer_large_data_by_brpc, "false");
 
 // max number of txns for every txn_partition_map in txn manager
 // this is a self protection to avoid too many txns saving in manager
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f5567a9165..e1d60f694c 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -48,9 +48,6 @@ namespace stream_load {
 
 NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id)
         : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
-    if (_parent->_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer;
-    }
     _node_channel_tracker = MemTracker::create_tracker(
             -1, fmt::format("NodeChannel:indexID={}:threadId={}",
                             std::to_string(_index_channel->_index_id), tls_ctx()->thread_id_str()));
@@ -79,6 +76,7 @@ NodeChannel::~NodeChannel() noexcept {
 Status NodeChannel::init(RuntimeState* state) {
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     _tuple_desc = _parent->_output_tuple_desc;
+    _state = state;
     auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         std::stringstream ss;
@@ -471,7 +469,7 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes,
-                                         &compressed_bytes, _tuple_data_buffer_ptr);
+                                         &compressed_bytes, _parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
             _add_batch_closure->clear_in_flight();
@@ -515,14 +513,31 @@ void NodeChannel::try_send_batch(RuntimeState* state) {
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
     }
 
-    if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-                                              ReusableClosure<PTabletWriterAddBatchResult>>(
-                &request, _tuple_data_buffer, _add_batch_closure);
+    if (_parent->_transfer_large_data_by_brpc && request.has_row_batch() &&
+        request.row_batch().has_tuple_data() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<
+                PTabletWriterAddBatchRequest, ReusableClosure<PTabletWriterAddBatchResult>>(
+                &request, _add_batch_closure);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+            _add_batch_closure->clear_in_flight();
+            return;
+        }
+        std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _add_batch_closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/tablet_writer_add_batch_by_http";
+        _add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _add_batch_closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->tablet_writer_add_batch_by_http(
+                &_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure);
+    } else {
+        _add_batch_closure->cntl.http_request().Clear();
+        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                       &_add_batch_closure->result, _add_batch_closure);
     }
-    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
-                                   _add_batch_closure);
-
     _next_packet_seq++;
 }
 
@@ -648,7 +663,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
     } else {
         *status = Status::OK();
     }
-    _transfer_data_by_brpc_attachment = config::transfer_data_by_brpc_attachment;
+    _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
 OlapTableSink::~OlapTableSink() {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 47b519c41a..8b3186e0c1 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -304,16 +304,9 @@ protected:
     std::mutex _closed_lock;
     bool _is_closed = false;
 
-private:
-    // buffer for saving serialized row batch data.
-    // In the non-attachment approach, we need to use two PRowBatch structures alternately
-    // so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch.
-    // This is not necessary with the attachment approach, because the memory structures
-    // are already copied into attachment memory before sending, and will wait for
-    // the previous RPC to be fully completed before the next copy.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
+    RuntimeState* _state;
 
+private:
     std::unique_ptr<RowBatch> _cur_batch;
     PTabletWriterAddBatchRequest _cur_add_batch_request;
     using AddBatchReq = std::pair<std::unique_ptr<RowBatch>, PTabletWriterAddBatchRequest>;
@@ -529,8 +522,8 @@ protected:
     // Save the status of close() method
     Status _close_status;
 
-    // TODO(cmy): this should be removed after we switch to rpc attachment by default.
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 
     // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we
     // should compute tablet index for every row
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 9914fb99d9..ca27660bfb 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -84,6 +84,7 @@ DataStreamSender::Channel::~Channel() {
     }
     // release this before request desctruct
     _brpc_request.release_finst_id();
+    _brpc_request.release_query_id();
 }
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -103,6 +104,11 @@ Status DataStreamSender::Channel::init(RuntimeState* state) {
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -122,6 +128,7 @@ Status DataStreamSender::Channel::init(RuntimeState* state) {
             return Status::InternalError(msg);
         }
     }
+    _state = state;
     return Status::OK();
 }
 
@@ -149,12 +156,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_parent->_transfer_data_by_brpc_attachment && _brpc_request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTransmitDataParams,
-                                              RefCountClosure<PTransmitDataResult>>(
-                &_brpc_request, _parent->_tuple_data_buffer, _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() &&
+        _brpc_request.row_batch().has_tuple_data() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<PTransmitDataParams,
+                                                           RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_data_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     }
-    _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
+
     if (batch != nullptr) {
         _brpc_request.release_row_batch();
     }
@@ -274,12 +297,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
           _sender_id(sender_id),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
-          _local_bytes_send_counter(nullptr),
-          _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer;
-    }
-}
+          _local_bytes_send_counter(nullptr) {}
 
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                                    const TDataStreamSink& sink,
@@ -298,11 +316,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
           _part_type(sink.output_partition.type),
           _ignore_not_found(sink.__isset.ignore_not_found ? sink.ignore_not_found : true),
           _dest_node_id(sink.dest_node_id),
-          _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer;
-    }
-
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -672,7 +686,7 @@ Status DataStreamSender::serialize_batch(RowBatch* src, PRowBatch* dest, int num
         SCOPED_TIMER(_serialize_batch_timer);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
-                                       _tuple_data_buffer_ptr));
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
     }
diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h
index 54ba6aa823..3e743bc007 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -199,8 +199,9 @@ protected:
 
         TNetworkAddress _brpc_dest_addr;
 
-        // TODO(zc): initused for brpc
+        // TODO(zc): init used for brpc
         PUniqueId _finst_id;
+        PUniqueId _query_id;
 
         // serialized batches for broadcasting; we need two so we can write
         // one while the other one is still being sent.
@@ -213,6 +214,7 @@ protected:
         PTransmitDataParams _brpc_request;
         std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
         RefCountClosure<PTransmitDataResult>* _closure = nullptr;
+        RuntimeState* _state;
         int32_t _brpc_timeout_ms = 500;
         // whether the dest can be treated as query statistics transfer chain.
         bool _is_transfer_chain;
@@ -259,14 +261,6 @@ private:
     PRowBatch _pb_batch1;
     PRowBatch _pb_batch2;
 
-    // This buffer is used to store the serialized rowbatch data.
-    // Only works when `config::transfer_data_by_brpc_attachment` is true.
-    // The data in the buffer is copied to the attachment of the brpc when it is sent,
-    // to avoid an extra pb serialization in the brpc.
-    // _tuple_data_buffer_ptr will point to _tuple_data_buffer if `config::transfer_data_by_brpc_attachment` is true.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
-
     std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row partition values
 
     // map from range value to partition_id
@@ -281,7 +275,8 @@ private:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 232fc23acf..8627542a85 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -210,7 +210,7 @@ RowBatch::~RowBatch() {
 }
 
 Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
-                           size_t* compressed_size, std::string* allocated_buf) {
+                           size_t* compressed_size, bool allow_transfer_large_data) {
     // num_rows
     output_batch->set_num_rows(_num_rows);
     // row_tuples
@@ -226,18 +226,8 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
     output_batch->set_is_compressed(false);
     // tuple data
     size_t tuple_byte_size = total_byte_size();
-    std::string* mutable_tuple_data = nullptr;
-    if (allocated_buf != nullptr) {
-        allocated_buf->resize(tuple_byte_size);
-        // all tuple data will be written in the allocated_buf
-        // instead of tuple_data in PRowBatch
-        mutable_tuple_data = allocated_buf;
-        // tuple_data is a required field
-        output_batch->set_tuple_data("");
-    } else {
-        mutable_tuple_data = output_batch->mutable_tuple_data();
-        mutable_tuple_data->resize(tuple_byte_size);
-    }
+    std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
+    mutable_tuple_data->resize(tuple_byte_size);
 
     // Copy tuple data, including strings, into output_batch (converting string
     // pointers into offsets in the process)
@@ -300,17 +290,12 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
 
     // return compressed and uncompressed size
     size_t pb_size = get_batch_size(*output_batch);
-    if (allocated_buf == nullptr) {
-        *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
-        *compressed_size = pb_size;
-        if (pb_size > std::numeric_limits<int32_t>::max()) {
-            // the protobuf has a hard limit of 2GB for serialized data.
-            return Status::InternalError(fmt::format(
-                    "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
-        }
-    } else {
-        *uncompressed_size = pb_size + tuple_byte_size;
-        *compressed_size = pb_size + mutable_tuple_data->size();
+    *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
+    *compressed_size = pb_size;
+    if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) {
+        // the protobuf has a hard limit of 2GB for serialized data.
+        return Status::InternalError(fmt::format(
+                "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 0fd79df7f0..26a9f8b534 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -353,10 +353,8 @@ public:
     // This function does not reset().
     // Returns the uncompressed serialized size (this will be the true size of output_batch
     // if tuple_data is actually uncompressed).
-    // if allocated_buf is not null, the serialized tuple data will be saved in this buf
-    // instead of `tuple_data` in PRowBatch.
     Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
-                     std::string* allocated_buf = nullptr);
+                     bool allow_transfer_large_data = false);
 
     // Utility function: returns total size of batch.
     static size_t get_batch_size(const PRowBatch& batch);
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 82c0f2ba1c..76ee592b6e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -49,6 +49,28 @@ static void thread_context_deleter(void* d) {
     delete static_cast<ThreadContext*>(d);
 }
 
+template <typename T>
+class NewHttpClosure : public ::google::protobuf::Closure {
+public:
+    NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {}
+    ~NewHttpClosure() {}
+
+    void Run() {
+        if (_request != nullptr) {
+            delete _request;
+            _request = nullptr;
+        }
+        if (_done != nullptr) {
+            _done->Run();
+        }
+        delete this;
+    }
+
+private:
+    T* _request = nullptr;
+    google::protobuf::Closure* _done = nullptr;
+};
+
 PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
     REGISTER_HOOK_METRIC(add_batch_task_queue_size,
@@ -66,19 +88,56 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b
                                          PTransmitDataResult* response,
                                          google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
-    VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
-             << " node=" << request->node_id();
+    // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
+
+    _transmit_data(cntl_base, request, response, done, Status::OK());
+}
+
+void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController* cntl_base,
+                                                 const PEmptyRequest* request,
+                                                 PTransmitDataResult* response,
+                                                 google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw, cntl);
+    _transmit_data(cntl_base, request_raw, response, done_raw, st);
+}
+
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_base,
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done,
+                                          const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+        SCOPED_ATTACH_TASK_THREAD(
+                ThreadContext::TaskType::QUERY, query_id, finst_id,
+                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
+    }
+    VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_data(),
     // give response a default value to avoid null pointers in high concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->stream_mgr()->transmit_data(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->stream_mgr()->transmit_data(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
+                         << ", fragment_instance_id=" << print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
@@ -141,11 +200,38 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
                                                    const PTabletWriterAddBlockRequest* request,
                                                    PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* done) {
+    // TODO(zxy) delete in 1.2 version
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
+
+    _tablet_writer_add_block(cntl_base, request, response, done);
+}
+
+void PInternalServiceImpl::tablet_writer_add_block_by_http(
+        google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
+        PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
+    PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTabletWriterAddBlockRequest>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(request_raw,
+                                                                                       cntl);
+    if (st.ok()) {
+        _tablet_writer_add_block(cntl_base, request_raw, response, done_raw);
+    } else {
+        st.to_protobuf(response->mutable_status());
+    }
+}
+
+void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* cntl_base,
+                                                    const PTabletWriterAddBlockRequest* request,
+                                                    PTabletWriterAddBlockResult* response,
+                                                    google::protobuf::Closure* done) {
     VLOG_RPC << "tablet writer add block, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id()
              << ", current_queued_size=" << _tablet_worker_pool.get_queue_size();
     int64_t submit_task_time_ns = MonotonicNanos();
-    _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() {
+    _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
@@ -153,8 +239,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
             SCOPED_RAW_TIMER(&execution_time_ns);
             SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
                                       _exec_env->load_channel_mgr()->mem_tracker());
-            brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
-            attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
+
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << st.get_error_msg()
@@ -173,6 +258,29 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
                                                    const PTabletWriterAddBatchRequest* request,
                                                    PTabletWriterAddBatchResult* response,
                                                    google::protobuf::Closure* done) {
+    _tablet_writer_add_batch(cntl_base, request, response, done);
+}
+
+void PInternalServiceImpl::tablet_writer_add_batch_by_http(
+        google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
+        PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
+    PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(request_raw,
+                                                                                       cntl);
+    if (st.ok()) {
+        _tablet_writer_add_batch(cntl_base, request_raw, response, done_raw);
+    } else {
+        st.to_protobuf(response->mutable_status());
+    }
+}
+
+void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcController* cntl_base,
+                                                    const PTabletWriterAddBatchRequest* request,
+                                                    PTabletWriterAddBatchResult* response,
+                                                    google::protobuf::Closure* done) {
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id()
              << ", current_queued_size=" << _tablet_worker_pool.get_queue_size();
@@ -188,8 +296,10 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
             SCOPED_RAW_TIMER(&execution_time_ns);
             SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
                                       _exec_env->load_channel_mgr()->mem_tracker());
+            // TODO(zxy) delete in 1.2 version
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);
+
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
@@ -497,19 +607,56 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
     SCOPED_SWITCH_BTHREAD();
-    VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
-             << " node=" << request->node_id();
+    // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+
+    _transmit_block(cntl_base, request, response, done, Status::OK());
+}
+
+void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* cntl_base,
+                                                  const PEmptyRequest* request,
+                                                  PTransmitDataResult* response,
+                                                  google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_block<PTransmitDataParams>(request_raw, cntl);
+    _transmit_block(cntl_base, request_raw, response, done_raw, st);
+}
+
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl_base,
+                                           const PTransmitDataParams* request,
+                                           PTransmitDataResult* response,
+                                           google::protobuf::Closure* done,
+                                           const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+        SCOPED_ATTACH_TASK_THREAD(
+                ThreadContext::TaskType::QUERY, query_id, finst_id,
+                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
+    }
+    VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_block(),
     // give response a default value to avoid null pointers in high concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->vstream_mgr()->transmit_block(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->vstream_mgr()->transmit_block(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg()
+                         << ", fragment_instance_id=" << print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 32fcead91b..b23ba3c70a 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -40,6 +40,11 @@ public:
                        ::doris::PTransmitDataResult* response,
                        ::google::protobuf::Closure* done) override;
 
+    void transmit_data_by_http(::google::protobuf::RpcController* controller,
+                               const ::doris::PEmptyRequest* request,
+                               ::doris::PTransmitDataResult* response,
+                               ::google::protobuf::Closure* done) override;
+
     void exec_plan_fragment(google::protobuf::RpcController* controller,
                             const PExecPlanFragmentRequest* request,
                             PExecPlanFragmentResult* result,
@@ -73,11 +78,21 @@ public:
                                  PTabletWriterAddBatchResult* response,
                                  google::protobuf::Closure* done) override;
 
+    void tablet_writer_add_batch_by_http(google::protobuf::RpcController* controller,
+                                         const ::doris::PEmptyRequest* request,
+                                         PTabletWriterAddBatchResult* response,
+                                         google::protobuf::Closure* done) override;
+
     void tablet_writer_add_block(google::protobuf::RpcController* controller,
                                  const PTabletWriterAddBlockRequest* request,
                                  PTabletWriterAddBlockResult* response,
                                  google::protobuf::Closure* done) override;
 
+    void tablet_writer_add_block_by_http(google::protobuf::RpcController* controller,
+                                         const ::doris::PEmptyRequest* request,
+                                         PTabletWriterAddBlockResult* response,
+                                         google::protobuf::Closure* done) override;
+
     void tablet_writer_cancel(google::protobuf::RpcController* controller,
                               const PTabletWriterCancelRequest* request,
                               PTabletWriterCancelResult* response,
@@ -109,6 +124,10 @@ public:
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_block_by_http(::google::protobuf::RpcController* controller,
+                                const ::doris::PEmptyRequest* request,
+                                ::doris::PTransmitDataResult* response,
+                                ::google::protobuf::Closure* done) override;
 
     void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request,
                    PSendDataResult* response, google::protobuf::Closure* done) override;
@@ -136,6 +155,26 @@ private:
 
     Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);
 
+    void _transmit_data(::google::protobuf::RpcController* controller,
+                        const ::doris::PTransmitDataParams* request,
+                        ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
+                        const Status& extract_st);
+
+    void _transmit_block(::google::protobuf::RpcController* controller,
+                         const ::doris::PTransmitDataParams* request,
+                         ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
+                         const Status& extract_st);
+
+    void _tablet_writer_add_batch(google::protobuf::RpcController* controller,
+                                  const PTabletWriterAddBatchRequest* request,
+                                  PTabletWriterAddBatchResult* response,
+                                  google::protobuf::Closure* done);
+
+    void _tablet_writer_add_block(google::protobuf::RpcController* controller,
+                                  const PTabletWriterAddBlockRequest* request,
+                                  PTabletWriterAddBlockResult* response,
+                                  google::protobuf::Closure* done);
+
 private:
     ExecEnv* _exec_env;
     PriorityThreadPool _tablet_worker_pool;
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index d7c2369e0b..b28679544f 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -71,9 +71,23 @@ public:
         }
 
         // new one stub and insert into map
+        auto stub = get_new_client_no_cache(host_port);
+        _stub_map.try_emplace_l(
+                host_port, [&stub](typename StubMap<T>::mapped_type& v) { stub = v; }, stub);
+        return stub;
+    }
+
+    std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
+                                               const std::string& protocol = "baidu_std",
+                                               const std::string& connect_type = "") {
         brpc::ChannelOptions options;
         if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
             options.protocol = config::function_service_protocol;
+        } else {
+            options.protocol = protocol;
+        }
+        if (connect_type != "") {
+            options.connection_type = connect_type;
         }
         std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
         int ret_code = 0;
@@ -86,11 +100,7 @@ public:
         if (ret_code) {
             return nullptr;
         }
-        auto stub = std::make_shared<T>(channel.release(),
-                                        google::protobuf::Service::STUB_OWNS_CHANNEL);
-        _stub_map.try_emplace_l(
-                host_port, [&stub](typename StubMap<T>::mapped_type& v) { stub = v; }, stub);
-        return stub;
+        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
     }
 
     size_t size() { return _stub_map.size(); }
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index ee9213dc3a..1e645afd9b 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -17,10 +17,16 @@
 
 #pragma once
 
-#include "util/stack_util.h"
-
 namespace doris {
 
+// When the tuple/block data is greater than 2G, embed the tuple/block data
+// and the request serialization string in the attachment, and use "http" brpc.
+// "http"brpc requires that only one of request and attachment be non-null.
+//
+// 2G: In the default "baidu_std" brpcd, upper limit of the request and attachment length is 2G.
+constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);
+
+// TODO(zxy) delete in v1.3 version
 // Transfer RowBatch in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -35,6 +41,7 @@ inline void request_row_batch_transfer_attachment(Params* brpc_request,
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Transfer Block in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -49,6 +56,7 @@ inline void request_block_transfer_attachment(Params* brpc_request,
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to RowBatch in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_row_batch(const Params* brpc_request,
@@ -62,6 +70,7 @@ inline void attachment_transfer_request_row_batch(const Params* brpc_request,
     }
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to Block in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_block(const Params* brpc_request, brpc::Controller* cntl) {
@@ -74,4 +83,101 @@ inline void attachment_transfer_request_block(const Params* brpc_request, brpc::
     }
 }
 
+// Embed tuple_data and brpc request serialization string in controller attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_tuple(Params* brpc_request, Closure* closure) {
+    auto row_batch = brpc_request->row_batch();
+    Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(), closure);
+    row_batch.set_tuple_data("");
+    return st;
+}
+
+// Embed column_values and brpc request serialization string in controller attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) {
+    auto block = brpc_request->block();
+    Status st = request_embed_attachment(brpc_request, block.column_values(), closure);
+    block.set_column_values("");
+    return st;
+}
+
+template <typename Params, typename Closure>
+inline Status request_embed_attachment(Params* brpc_request, const std::string& data,
+                                       Closure* closure) {
+    butil::IOBuf attachment;
+
+    // step1: serialize brpc_request to string, and append to attachment.
+    std::string req_str;
+    brpc_request->SerializeToString(&req_str);
+    int64_t req_str_size = req_str.size();
+    attachment.append(&req_str_size, sizeof(req_str_size));
+    attachment.append(req_str);
+
+    // step2: append data to attachment and put it in the closure.
+    int64_t data_size = data.size();
+    attachment.append(&data_size, sizeof(data_size));
+    try {
+        attachment.append(data);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for append data to attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("request embed attachment failed to memcpy {} bytes", data_size));
+    }
+    // step3: attachment add to closure.
+    closure->cntl.request_attachment().swap(attachment);
+    return Status::OK();
+}
+
+// Extract the brpc request and tuple data from the controller attachment,
+// and put the tuple data into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_tuple(const Params* brpc_request,
+                                                       brpc::Controller* cntl) {
+    Params* req = const_cast<Params*>(brpc_request);
+    auto rb = req->mutable_row_batch();
+    return attachment_extract_request(req, cntl, rb->mutable_tuple_data());
+}
+
+// Extract the brpc request and block from the controller attachment,
+// and put the block into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_block(const Params* brpc_request,
+                                                       brpc::Controller* cntl) {
+    Params* req = const_cast<Params*>(brpc_request);
+    auto block = req->mutable_block();
+    return attachment_extract_request(req, cntl, block->mutable_column_values());
+}
+
+template <typename Params>
+inline Status attachment_extract_request(const Params* brpc_request, brpc::Controller* cntl,
+                                         std::string* data) {
+    const butil::IOBuf& io_buf = cntl->request_attachment();
+
+    // step1: deserialize request string to brpc_request from attachment.
+    int64_t req_str_size;
+    io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0);
+    std::string req_str;
+    io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size));
+    Params* req = const_cast<Params*>(brpc_request);
+    req->ParseFromString(req_str);
+
+    // step2: extract data from attachment.
+    int64_t data_size;
+    io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + req_str_size);
+    try {
+        io_buf.copy_to(data, data_size, sizeof(data_size) + sizeof(req_str_size) + req_str_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for extract data from attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("attachment extract request failed to memcpy {} bytes", data_size));
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 084a44611f..3e50c1578b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -666,7 +666,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
 }
 
 Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
-                        std::string* allocated_buf) const {
+                        bool allow_transfer_large_data) const {
     // calc uncompressed size for allocation
     size_t content_uncompressed_size = 0;
     for (const auto& c : *this) {
@@ -678,17 +678,20 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real size.
+    std::string* column_values = nullptr;
     try {
-        allocated_buf->resize(content_uncompressed_size);
+        column_values = pblock->mutable_column_values();
+        column_values->resize(content_uncompressed_size);
     } catch (...) {
         std::exception_ptr p = std::current_exception();
-        std::string msg = fmt::format("Try to alloc {} bytes for allocated_buf failed. reason {}",
-                                      content_uncompressed_size,
-                                      p ? p.__cxa_exception_type()->name() : "null");
+        std::string msg = fmt::format(
+                "Try to alloc {} bytes for pblock column values failed. reason {}",
+                content_uncompressed_size, p ? p.__cxa_exception_type()->name() : "null");
         LOG(WARNING) << msg;
         return Status::BufferAllocFailed(msg);
     }
-    char* buf = allocated_buf->data();
+    char* buf = column_values->data();
+
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf);
     }
@@ -713,12 +716,12 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
         }
         size_t compressed_size = 0;
         char* compressed_output = compression_scratch.data();
-        snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
+        snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output,
                             &compressed_size);
 
         if (LIKELY(compressed_size < content_uncompressed_size)) {
             compression_scratch.resize(compressed_size);
-            allocated_buf->swap(compression_scratch);
+            column_values->swap(compression_scratch);
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
@@ -728,7 +731,7 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     }
-    if (*compressed_bytes >= std::numeric_limits<int32_t>::max()) {
+    if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) {
         return Status::InternalError(fmt::format(
                 "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
     }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 5f62365382..c15a6728c4 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -258,7 +258,7 @@ public:
 
     // serialize block to PBlock
     Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
-                     std::string* allocated_buf) const;
+                     bool allow_transfer_large_data = false) const;
 
     // serialize block to PRowbatch
     void serialize(RowBatch*, const RowDescriptor&);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 5153327644..e7d84c1b48 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -51,6 +51,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -68,6 +73,7 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
+    _state = state;
     return Status::OK();
 }
 
@@ -138,13 +144,27 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_brpc_request.has_block()) {
-        request_block_transfer_attachment<PTransmitDataParams,
-                                          RefCountClosure<PTransmitDataResult>>(
-                &_brpc_request, _parent->_column_values_buffer, _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() &&
+        _brpc_request.block().has_column_values() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_block<PTransmitDataParams,
+                                                           RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_block_by_http(&_closure->cntl, NULL, &_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     }
-
-    _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     if (block != nullptr) {
         _brpc_request.release_block();
     }
@@ -259,7 +279,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
-          _dest_node_id(sink.dest_node_id) {
+          _dest_node_id(sink.dest_node_id),
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -567,7 +588,7 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
-                                       &_column_values_buffer));
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 5a8d5d25cd..ce8a59711d 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -144,10 +144,8 @@ protected:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    // This buffer is used to store the serialized block data
-    // The data in the buffer is copied to the attachment of the brpc when it is sent,
-    // to avoid an extra pb serialization in the brpc.
-    std::string _column_values_buffer;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 // TODO: support local exechange
@@ -188,6 +186,7 @@ public:
         }
         // release this before request desctruct
         _brpc_request.release_finst_id();
+        _brpc_request.release_query_id();
     }
 
     // Initialize channel.
@@ -279,6 +278,7 @@ private:
     TNetworkAddress _brpc_dest_addr;
 
     PUniqueId _finst_id;
+    PUniqueId _query_id;
     PBlock _pb_block;
     PTransmitDataParams _brpc_request;
     std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
@@ -287,6 +287,7 @@ private:
     // whether the dest can be treated as query statistics transfer chain.
     bool _is_transfer_chain;
     bool _send_query_statistics_with_every_batch;
+    RuntimeState* _state;
 
     size_t _capacity;
     bool _is_local;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 6129d9815e..68d0d08d5f 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -18,6 +18,7 @@
 #include "vec/sink/vtablet_sink.h"
 
 #include "runtime/thread_context.h"
+#include "util/brpc_client_cache.h"
 #include "util/debug/sanitizer_scopes.h"
 #include "util/doris_metrics.h"
 #include "util/proto_util.h"
@@ -232,7 +233,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         Status st = block.serialize(request.mutable_block(), &uncompressed_bytes, &compressed_bytes,
-                                    &_column_values_buffer);
+                                    _parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
             _add_block_closure->clear_in_flight();
@@ -273,13 +274,31 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
     }
 
-    if (request.has_block()) {
-        request_block_transfer_attachment<PTabletWriterAddBlockRequest,
-                                          ReusableClosure<PTabletWriterAddBlockResult>>(
-                &request, _column_values_buffer, _add_block_closure);
+    if (_parent->_transfer_large_data_by_brpc && request.has_block() &&
+        request.block().has_column_values() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_block<
+                PTabletWriterAddBlockRequest, ReusableClosure<PTabletWriterAddBlockResult>>(
+                &request, _add_block_closure);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+            _add_block_closure->clear_in_flight();
+            return;
+        }
+        std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _add_block_closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/tablet_writer_add_block_by_http";
+        _add_block_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _add_block_closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->tablet_writer_add_block_by_http(
+                &_add_block_closure->cntl, NULL, &_add_block_closure->result, _add_block_closure);
+    } else {
+        _add_block_closure->cntl.http_request().Clear();
+        _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
+                                       &_add_block_closure->result, _add_block_closure);
     }
-    _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request, &_add_block_closure->result,
-                                   _add_block_closure);
 
     _next_packet_seq++;
 }
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 65441d8069..f0286bd4a5 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -63,11 +63,6 @@ private:
             std::pair<std::unique_ptr<vectorized::MutableBlock>, PTabletWriterAddBlockRequest>;
     std::queue<AddBlockReq> _pending_blocks;
     ReusableClosure<PTabletWriterAddBlockResult>* _add_block_closure = nullptr;
-
-    // This buffer is used to store the serialized block data
-    // The data in the buffer is copied to the attachment of the brpc when it is sent,
-    // to avoid an extra pb serialization in the brpc.
-    std::string _column_values_buffer;
 };
 
 class OlapTableSink;
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index ffc7b4c039..fb9d2be897 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -163,13 +163,10 @@ TEST(BlockTest, RowBatchCovertToBlock) {
 void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    std::string column_values_buffer;
-    Status st =
-            block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, &column_values_buffer);
+    Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes);
     EXPECT_TRUE(st.ok());
     EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
-    EXPECT_EQ(compressed_bytes, column_values_buffer.size());
-    pblock->set_column_values(column_values_buffer);
+    EXPECT_EQ(compressed_bytes, pblock->column_values().size());
 
     const vectorized::ColumnWithTypeAndName& type_and_name =
             block.get_columns_with_type_and_name()[0];
diff --git a/docs/en/docs/admin-manual/config/be-config.md b/docs/en/docs/admin-manual/config/be-config.md
index ddb0e31f83..025eca2451 100644
--- a/docs/en/docs/admin-manual/config/be-config.md
+++ b/docs/en/docs/admin-manual/config/be-config.md
@@ -192,11 +192,11 @@ This configuration is mainly used to modify the parameter `socket_max_unwritten_
 
 Sometimes the query fails and an error message of `The server is overcrowded` will appear in the BE log. This means there are too many messages to buffer at the sender side, which may happen when the SQL needs to send large bitmap value. You can avoid this error by increasing the configuration.
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * Type: bool
-* Description: This configuration is used to control whether to transfer the RowBatch in the ProtoBuf Request to the Controller Attachment and then send it through brpc. When the length of ProtoBuf Request exceeds 2G, an error will be reported: Bad request, error_text=[E1003]Fail to compress request, Putting RowBatch in Controller Attachment will be faster and avoid this error.
-* Default value: false
+* Description: This configuration is used to control whether to serialize the protoBuf request and embed the Tuple/Block data into the controller attachment and send it through http brpc when the length of the Tuple/Block data is greater than 1.8G. To avoid errors when the length of the protoBuf request exceeds 2G: Bad request, error_text=[E1003]Fail to compress request. In the past version, after putting Tuple/Block data in the attachment, it was sent through the default baidu_std brpc, [...]
+* Default value: true
 
 ### `brpc_num_threads`
 
diff --git a/docs/zh-CN/docs/admin-manual/config/be-config.md b/docs/zh-CN/docs/admin-manual/config/be-config.md
index 2d6599a284..8bfe0d8f7d 100644
--- a/docs/zh-CN/docs/admin-manual/config/be-config.md
+++ b/docs/zh-CN/docs/admin-manual/config/be-config.md
@@ -185,11 +185,11 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 
 有时查询失败,BE 日志中会出现 `The server is overcrowded` 的错误信息,表示连接上有过多的未发送数据。当查询需要发送较大的bitmap字段时,可能会遇到该问题,此时可能通过调大该配置避免该错误。
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * 类型: bool
-* 描述:该配置用来控制是否将ProtoBuf Request中的RowBatch转移到Controller Attachment后通过brpc发送。ProtoBuf Request的长度超过2G时会报错: Bad request, error_text=[E1003]Fail to compress request,将RowBatch放到Controller Attachment中将更快且避免这个错误。
-* 默认值:false
+* 描述:该配置用来控制是否在 Tuple/Block data 长度大于1.8G时,将 protoBuf request 序列化后和 Tuple/Block data 一起嵌入到 controller attachment 后通过 http brpc 发送。为了避免 protoBuf request 的长度超过2G时的错误:Bad request, error_text=[E1003]Fail to compress request。在过去的版本中,曾将 Tuple/Block data 放入 attachment 后通过默认的 baidu_std brpc 发送,但 attachment 超过2G时将被截断,通过 http brpc 发送不存在2G的限制。
+* 默认值:true
 
 ### `brpc_num_threads`
 
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 3cccbb9d5d..51f48f1a2a 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -46,6 +46,7 @@ message PTransmitDataParams {
     optional PBlock block = 9;
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
+    optional PUniqueId query_id = 11;
 };
 
 message PTransmitDataResult {
@@ -466,8 +467,11 @@ message PResetRPCChannelResponse {
     repeated string channels = 2;
 };
 
+message PEmptyRequest {};
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
     // If #fragments of a query is < 3, use exec_plan_fragment directly.
     // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
@@ -477,7 +481,9 @@ service PBackendService {
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
     rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult);
+    rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns (PTabletWriterAddBatchResult);
     rpc tablet_writer_add_block(PTabletWriterAddBlockRequest) returns (PTabletWriterAddBlockResult);
+    rpc tablet_writer_add_block_by_http(PEmptyRequest) returns (PTabletWriterAddBlockResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);
     rpc get_info(PProxyRequest) returns (PProxyResult); 
     rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
@@ -490,6 +496,7 @@ service PBackendService {
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse);
     rpc reset_rpc_channel(PResetRPCChannelRequest) returns (PResetRPCChannelResponse);
     rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org