You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/14 09:05:46 UTC

[incubator-doris] branch dev-1.0.1 updated (e11e7be26d -> bdfd1d091f)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


    from e11e7be26d [fix] Do not send drop task when replay drop table (#10062)
     new 1e16352526 [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)
     new 6383b60b5a [Bug] Fix bug push value predicate of unique table when have sequence column (#10060)
     new 1f17a2b9ae [fix] fix bug that show proc "/cluster_balance/history_tablets" return malformat error (#10073)
     new bdfd1d091f [BUGFIX] Fix wrong column types in result file sink (#10079)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/config.h                             |   8 +-
 be/src/exec/tablet_sink.cpp                        |  39 ++++--
 be/src/exec/tablet_sink.h                          |  14 +-
 be/src/olap/rowset/beta_rowset_reader.cpp          |  14 +-
 be/src/olap/rowset/beta_rowset_reader.h            |   2 +
 be/src/runtime/data_stream_sender.cpp              |  51 ++++---
 be/src/runtime/data_stream_sender.h                |  15 +-
 be/src/runtime/row_batch.cpp                       |  52 +++----
 be/src/runtime/row_batch.h                         |   4 +-
 be/src/service/internal_service.cpp                | 156 ++++++++++++++++++---
 be/src/service/internal_service.h                  |  29 ++++
 be/src/util/brpc_client_cache.h                    |  21 ++-
 be/src/util/proto_util.h                           | 110 ++++++++++++++-
 be/src/vec/core/block.cpp                          |  27 +++-
 be/src/vec/core/block.h                            |   3 +-
 be/src/vec/runtime/vfile_result_writer.cpp         |  17 +--
 be/src/vec/sink/vdata_stream_sender.cpp            |  38 +++--
 be/src/vec/sink/vdata_stream_sender.h              |   9 +-
 be/src/vec/sink/vtablet_sink.cpp                   |   2 +
 be/test/vec/core/block_test.cpp                    |  11 +-
 docs/en/administrator-guide/config/be_config.md    |   6 +-
 docs/zh-CN/administrator-guide/config/be_config.md |   6 +-
 .../common/proc/TabletSchedulerDetailProcDir.java  |  11 +-
 .../org/apache/doris/planner/HashJoinNode.java     |  22 +--
 .../org/apache/doris/planner/OlapScanNode.java     |  58 +++-----
 .../org/apache/doris/planner/PlanFragment.java     |   5 +-
 .../org/apache/doris/planner/ColocatePlanTest.java |   2 +-
 .../doris/planner/DistributedPlannerTest.java      |   8 +-
 .../planner/MaterializedViewFunctionTest.java      |  16 +--
 .../apache/doris/planner/ResourceTagQueryTest.java |   6 +-
 .../doris/planner/TableFunctionPlanTest.java       |   2 +-
 gensrc/proto/internal_service.proto                |   6 +
 .../data/performance/redundant_conjuncts.out       |  28 ++--
 33 files changed, 533 insertions(+), 265 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 01/04: [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1e16352526fa398dc6f3799d7256115c1313a73d
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Mon Jun 13 20:41:48 2022 +0800

    [fix](brpc) Embed serialized request into the attachment and transmit it through http brpc (#9803)
    
    When the length of `Tuple/Block data` is greater than 2G, serialize the protoBuf request and embed the
    `Tuple/Block data` into the controller attachment and transmit it through http brpc.
    
    This is to avoid errors when the length of the protoBuf request exceeds 2G:
    `Bad request, error_text=[E1003]Fail to compress request`.
    
    In #7164, `Tuple/Block data` was put into attachment and sent via default `baidu_std brpc`,
    but when the attachment exceeds 2G, it will be truncated. There is no 2G limit for sending via `http brpc`.
    
    Also, in #7921, consider putting `Tuple/Block data` into attachment transport by default, as this theoretically
    reduces one serialization and improves performance. However, the test found that the performance did not improve,
    but the memory peak increased due to the addition of a memory copy.
---
 be/src/common/config.h                             |   8 +-
 be/src/exec/tablet_sink.cpp                        |  39 ++++--
 be/src/exec/tablet_sink.h                          |  14 +-
 be/src/runtime/data_stream_sender.cpp              |  51 ++++---
 be/src/runtime/data_stream_sender.h                |  15 +-
 be/src/runtime/row_batch.cpp                       |  52 +++----
 be/src/runtime/row_batch.h                         |   4 +-
 be/src/service/internal_service.cpp                | 156 ++++++++++++++++++---
 be/src/service/internal_service.h                  |  29 ++++
 be/src/util/brpc_client_cache.h                    |  21 ++-
 be/src/util/proto_util.h                           | 110 ++++++++++++++-
 be/src/vec/core/block.cpp                          |  27 +++-
 be/src/vec/core/block.h                            |   3 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  38 +++--
 be/src/vec/sink/vdata_stream_sender.h              |   9 +-
 be/src/vec/sink/vtablet_sink.cpp                   |   2 +
 be/test/vec/core/block_test.cpp                    |  11 +-
 docs/en/administrator-guide/config/be_config.md    |   6 +-
 docs/zh-CN/administrator-guide/config/be_config.md |   6 +-
 gensrc/proto/internal_service.proto                |   6 +
 20 files changed, 457 insertions(+), 150 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index fad7459bef..32d19468bf 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -549,9 +549,11 @@ CONF_String(default_rowset_type, "BETA");
 CONF_Int64(brpc_max_body_size, "3147483648");
 // Max unwritten bytes in each socket, if the limit is reached, Socket.Write fails with EOVERCROWDED
 CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
-// Whether to transfer RowBatch in ProtoBuf Request to Controller Attachment and send it
-// through brpc, this will be faster and avoid the error of Request length overflow.
-CONF_mBool(transfer_data_by_brpc_attachment, "false");
+// TODO(zxy): expect to be true in v1.3
+// Whether to embed the ProtoBuf Request serialized string together with Tuple/Block data into
+// Controller Attachment and send it through http brpc when the length of the Tuple/Block data
+// is greater than 1.8G. This is to avoid the error of Request length overflow (2G).
+CONF_mBool(transfer_large_data_by_brpc, "false");
 
 // max number of txns for every txn_partition_map in txn manager
 // this is a self protection to avoid too many txns saving in manager
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index f40d1b0211..f02d8bd375 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -46,9 +46,6 @@ namespace stream_load {
 
 NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id)
         : _parent(parent), _index_channel(index_channel), _node_id(node_id) {
-    if (_parent->_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer;
-    }
 }
 
 NodeChannel::~NodeChannel() noexcept {
@@ -71,6 +68,7 @@ NodeChannel::~NodeChannel() noexcept {
 // returned directly via "TabletSink::prepare()" method.
 Status NodeChannel::init(RuntimeState* state) {
     _tuple_desc = _parent->_output_tuple_desc;
+    _state = state;
     auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
         std::stringstream ss;
@@ -495,7 +493,7 @@ void NodeChannel::try_send_batch() {
         SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
         Status st = row_batch->serialize(request.mutable_row_batch(), &uncompressed_bytes,
-                                         &compressed_bytes, _tuple_data_buffer_ptr);
+                                         &compressed_bytes, _parent->_transfer_large_data_by_brpc);
         if (!st.ok()) {
             cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
             _add_batch_closure->clear_in_flight();
@@ -539,14 +537,31 @@ void NodeChannel::try_send_batch() {
         CHECK(_pending_batches_num == 0) << _pending_batches_num;
     }
 
-    if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-                                              ReusableClosure<PTabletWriterAddBatchResult>>(
-                &request, _tuple_data_buffer, _add_batch_closure);
+    if (_parent->_transfer_large_data_by_brpc && request.has_row_batch() &&
+        request.row_batch().has_tuple_data() && request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<
+                PTabletWriterAddBatchRequest, ReusableClosure<PTabletWriterAddBatchResult>>(
+                &request, _add_batch_closure);
+        if (!st.ok()) {
+            cancel(fmt::format("{}, err: {}", channel_info(), st.get_error_msg()));
+            _add_batch_closure->clear_in_flight();
+            return;
+        }
+        std::string brpc_url = fmt::format("http://{}:{}", _node_info.host, _node_info.brpc_port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _add_batch_closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/tablet_writer_add_batch_by_http";
+        _add_batch_closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _add_batch_closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->tablet_writer_add_batch_by_http(
+                &_add_batch_closure->cntl, NULL, &_add_batch_closure->result, _add_batch_closure);
+    } else {
+        _add_batch_closure->cntl.http_request().Clear();
+        _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
+                                       &_add_batch_closure->result, _add_batch_closure);
     }
-    _stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request, &_add_batch_closure->result,
-                                   _add_batch_closure);
-
     _next_packet_seq++;
 }
 
@@ -690,7 +705,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const RowDescriptor& row_desc,
         *status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs);
     }
     _name = "OlapTableSink";
-    _transfer_data_by_brpc_attachment = config::transfer_data_by_brpc_attachment;
+    _transfer_large_data_by_brpc = config::transfer_large_data_by_brpc;
 }
 
 OlapTableSink::~OlapTableSink() {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index a6af6c15e7..1029cecb4a 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -284,15 +284,6 @@ private:
     std::atomic<int64_t> _queue_push_lock_ns {0};
     std::atomic<int64_t> _actual_consume_ns {0};
 
-    // buffer for saving serialized row batch data.
-    // In the non-attachment approach, we need to use two PRowBatch structures alternately
-    // so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch.
-    // This is not necessary with the attachment approach, because the memory structures
-    // are already copied into attachment memory before sending, and will wait for
-    // the previous RPC to be fully completed before the next copy.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
-
     // the timestamp when this node channel be marked closed and finished closed
     uint64_t _close_time_ms = 0;
 
@@ -306,6 +297,7 @@ private:
     // The IndexChannel is definitely accessible until the NodeChannel is closed.
     std::mutex _closed_lock;
     bool _is_closed = false;
+    RuntimeState* _state;
 };
 
 class IndexChannel {
@@ -492,8 +484,8 @@ protected:
     // Save the status of close() method
     Status _close_status;
 
-    // TODO(cmy): this should be removed after we switch to rpc attachment by default.
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 
     // FIND_TABLET_EVERY_ROW is used for both hash and random distribution info, which indicates that we
     // should compute tablet index for every row
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 93b0a128e6..ca095cef64 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -82,6 +82,7 @@ DataStreamSender::Channel::~Channel() {
     }
     // release this before request desctruct
     _brpc_request.release_finst_id();
+    _brpc_request.release_query_id();
 }
 
 Status DataStreamSender::Channel::init(RuntimeState* state) {
@@ -101,6 +102,11 @@ Status DataStreamSender::Channel::init(RuntimeState* state) {
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -120,6 +126,7 @@ Status DataStreamSender::Channel::init(RuntimeState* state) {
             return Status::InternalError(msg);
         }
     }
+    _state = state;
     return Status::OK();
 }
 
@@ -147,12 +154,28 @@ Status DataStreamSender::Channel::send_batch(PRowBatch* batch, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_parent->_transfer_data_by_brpc_attachment && _brpc_request.has_row_batch()) {
-        request_row_batch_transfer_attachment<PTransmitDataParams,
-            RefCountClosure<PTransmitDataResult>>(&_brpc_request, _parent->_tuple_data_buffer,
-                    _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_row_batch() &&
+        _brpc_request.row_batch().has_tuple_data() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_tuple<PTransmitDataParams,
+                                                           RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_data_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_data_by_http(&_closure->cntl, NULL, &_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     }
-    _brpc_stub->transmit_data(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
+
     if (batch != nullptr) {
         _brpc_request.release_row_batch();
     }
@@ -272,13 +295,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
           _sender_id(sender_id),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
-          _local_bytes_send_counter(nullptr),
-          _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer; 
-    }
-}
+          _local_bytes_send_counter(nullptr) {}
 
 DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& row_desc,
                                    const TDataStreamSink& sink,
@@ -297,12 +314,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const RowDes
           _part_type(sink.output_partition.type),
           _ignore_not_found(sink.__isset.ignore_not_found ? sink.ignore_not_found : true),
           _dest_node_id(sink.dest_node_id),
-          _transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
-
-    if (_transfer_data_by_brpc_attachment) {
-        _tuple_data_buffer_ptr = &_tuple_data_buffer; 
-    }
-
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -668,7 +680,8 @@ Status DataStreamSender::serialize_batch(RowBatch* src, PRowBatch* dest, int num
     {
         SCOPED_TIMER(_serialize_batch_timer);
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, _tuple_data_buffer_ptr));
+        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
     }
diff --git a/be/src/runtime/data_stream_sender.h b/be/src/runtime/data_stream_sender.h
index 1da9d32026..377d13f1fa 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -197,8 +197,9 @@ protected:
 
         TNetworkAddress _brpc_dest_addr;
 
-        // TODO(zc): initused for brpc
+        // TODO(zc): init used for brpc
         PUniqueId _finst_id;
+        PUniqueId _query_id;
 
         // serialized batches for broadcasting; we need two so we can write
         // one while the other one is still being sent.
@@ -211,6 +212,7 @@ protected:
         PTransmitDataParams _brpc_request;
         std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
         RefCountClosure<PTransmitDataResult>* _closure = nullptr;
+        RuntimeState* _state;
         int32_t _brpc_timeout_ms = 500;
         // whether the dest can be treated as query statistics transfer chain.
         bool _is_transfer_chain;
@@ -257,14 +259,6 @@ private:
     PRowBatch _pb_batch1;
     PRowBatch _pb_batch2;
 
-    // This buffer is used to store the serialized rowbatch data.
-    // Only works when `config::transfer_data_by_brpc_attachment` is true.
-    // The data in the buffer is copied to the attachment of the brpc when it is sent,
-    // to avoid an extra pb serialization in the brpc.
-    // _tuple_data_buffer_ptr will point to _tuple_data_buffer if `config::transfer_data_by_brpc_attachment` is true.
-    std::string _tuple_data_buffer;
-    std::string* _tuple_data_buffer_ptr = nullptr;
-
     std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row partition values
 
     // map from range value to partition_id
@@ -279,7 +273,8 @@ private:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    bool _transfer_data_by_brpc_attachment = false;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index cdbbebbfed..02d70b16e1 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -237,7 +237,7 @@ RowBatch::~RowBatch() {
 }
 
 Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
-                           size_t* compressed_size, std::string* allocated_buf) {
+                           size_t* compressed_size, bool allow_transfer_large_data) {
     // num_rows
     output_batch->set_num_rows(_num_rows);
     // row_tuples
@@ -252,19 +252,9 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
     // is_compressed
     output_batch->set_is_compressed(false);
     // tuple data
-    size_t size = total_byte_size();
-    std::string* mutable_tuple_data = nullptr;
-    if (allocated_buf != nullptr) {
-        allocated_buf->resize(size);
-        // all tuple data will be written in the allocated_buf
-        // instead of tuple_data in PRowBatch
-        mutable_tuple_data = allocated_buf;
-        // tuple_data is a required field
-        output_batch->set_tuple_data("");
-    } else {
-        mutable_tuple_data = output_batch->mutable_tuple_data();
-        mutable_tuple_data->resize(size);
-    }
+    size_t tuple_byte_size = total_byte_size();
+    std::string* mutable_tuple_data = output_batch->mutable_tuple_data();
+    mutable_tuple_data->resize(tuple_byte_size);
 
     // Copy tuple data, including strings, into output_batch (converting string
     // pointers into offsets in the process)
@@ -288,15 +278,15 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
             mutable_tuple_offsets->Add((int32_t) offset);
             mutable_new_tuple_offsets->Add(offset);
             row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /* convert_ptrs */ true);
-            CHECK_LE(offset, size);
+            CHECK_LE(offset, tuple_byte_size);
         }
     }
-    CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
+    CHECK_EQ(offset, tuple_byte_size) << "offset: " << offset << " vs. size: " << tuple_byte_size;
 
-    if (config::compress_rowbatches && size > 0) {
+    if (config::compress_rowbatches && tuple_byte_size > 0) {
         // Try compressing tuple_data to _compression_scratch, swap if compressed data is
         // smaller
-        uint32_t max_compressed_size = snappy::MaxCompressedLength(size);
+        uint32_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
 
         if (_compression_scratch.size() < max_compressed_size) {
             _compression_scratch.resize(max_compressed_size);
@@ -304,33 +294,25 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
 
         size_t compressed_size = 0;
         char* compressed_output = _compression_scratch.data();
-        snappy::RawCompress(mutable_tuple_data->data(), size, compressed_output, &compressed_size);
+        snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size, compressed_output, &compressed_size);
 
-        if (LIKELY(compressed_size < size)) {
+        if (LIKELY(compressed_size < tuple_byte_size)) {
             _compression_scratch.resize(compressed_size);
             mutable_tuple_data->swap(_compression_scratch);
             output_batch->set_is_compressed(true);
         }
 
-        VLOG_ROW << "uncompressed size: " << size << ", compressed size: " << compressed_size;
+        VLOG_ROW << "uncompressed size: " << tuple_byte_size << ", compressed size: " << compressed_size;
     }
 
     // return compressed and uncompressed size
     size_t pb_size = get_batch_size(*output_batch);
-    if (allocated_buf == nullptr) {
-        *uncompressed_size = pb_size - mutable_tuple_data->size() + size;
-        *compressed_size = pb_size;
-        if (pb_size > std::numeric_limits<int32_t>::max()) {
-            // the protobuf has a hard limit of 2GB for serialized data.
-            return Status::InternalError(
-                    fmt::format("The rowbatch is large than 2GB({}), can not send by Protobuf. "
-                                "please set BE config 'transfer_data_by_brpc_attachment' to true "
-                                "and restart BE.",
-                                pb_size));
-        }
-    } else {
-        *uncompressed_size = pb_size + size;
-        *compressed_size = pb_size + mutable_tuple_data->size();
+    *uncompressed_size = pb_size - mutable_tuple_data->size() + tuple_byte_size;
+    *compressed_size = pb_size;
+    if (!allow_transfer_large_data && pb_size > std::numeric_limits<int32_t>::max()) {
+        // the protobuf has a hard limit of 2GB for serialized data.
+        return Status::InternalError(fmt::format(
+                "The rowbatch is large than 2GB({}), can not send by Protobuf.", pb_size));
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 070a1e578f..5aad1a0e62 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -351,10 +351,8 @@ public:
     // This function does not reset().
     // Returns the uncompressed serialized size (this will be the true size of output_batch
     // if tuple_data is actually uncompressed).
-    // if allocated_buf is not null, the serialized tuple data will be saved in this buf
-    // instead of `tuple_data` in PRowBatch.
     Status serialize(PRowBatch* output_batch, size_t* uncompressed_size, size_t* compressed_size,
-                     std::string* allocated_buf = nullptr);
+                     bool allow_transfer_large_data = false);
 
     // Utility function: returns total size of batch.
     static size_t get_batch_size(const PRowBatch& batch);
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index c1185591e6..c4c48a3c8b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -42,6 +42,28 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT);
 
+template <typename T>
+class NewHttpClosure : public ::google::protobuf::Closure {
+public:
+    NewHttpClosure(T* request, google::protobuf::Closure* done) : _request(request), _done(done) {}
+    ~NewHttpClosure() {}
+
+    void Run() {
+        if (_request != nullptr) {
+            delete _request;
+            _request = nullptr;
+        }
+        if (_done != nullptr) {
+            _done->Run();
+        }
+        delete this;
+    }
+
+private:
+    T* _request = nullptr;
+    google::protobuf::Closure* _done = nullptr;
+};
+
 template <typename T>
 PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
@@ -63,15 +85,51 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_row_batch<PTransmitDataParams>(request, cntl);
+
+    _transmit_data(cntl_base, request, response, done, Status::OK());
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::transmit_data_by_http(google::protobuf::RpcController* cntl_base,
+                                                 const PEmptyRequest* request,
+                                                 PTransmitDataResult* response,
+                                                 google::protobuf::Closure* done) {
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_tuple<PTransmitDataParams>(request_raw, cntl);
+    _transmit_data(cntl_base, request_raw, response, done_raw, st);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* cntl_base,
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done,
+                                          const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+    }
+    VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_data(),
     // give response a default value to avoid null pointers in high concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->stream_mgr()->transmit_data(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->stream_mgr()->transmit_data(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_data failed, message=" << st.get_error_msg()
+                         << ", fragment_instance_id=" << print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
@@ -133,9 +191,34 @@ void PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcCont
 
 template <typename T>
 void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base,
-                                                      const PTabletWriterAddBatchRequest* request,
-                                                      PTabletWriterAddBatchResult* response,
-                                                      google::protobuf::Closure* done) {
+                                                   const PTabletWriterAddBatchRequest* request,
+                                                   PTabletWriterAddBatchResult* response,
+                                                   google::protobuf::Closure* done) {
+    _tablet_writer_add_batch(cntl_base, request, response, done);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::tablet_writer_add_batch_by_http(
+        google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
+        PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
+    PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_tuple<PTabletWriterAddBatchRequest>(request_raw,
+                                                                                       cntl);
+    if (st.ok()) {
+        _tablet_writer_add_batch(cntl_base, request_raw, response, done_raw);
+    } else {
+        st.to_protobuf(response->mutable_status());
+    }
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::_tablet_writer_add_batch(google::protobuf::RpcController* cntl_base,
+                                                    const PTabletWriterAddBatchRequest* request,
+                                                    PTabletWriterAddBatchResult* response,
+                                                    google::protobuf::Closure* done) {
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id()
              << ", current_queued_size=" << _tablet_worker_pool.get_queue_size();
@@ -149,8 +232,10 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
+            // TODO(zxy) delete in 1.2 version
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);
+
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
@@ -460,22 +545,57 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string& ser_reque
 
 template <typename T>
 void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cntl_base,
-                                             const PTransmitDataParams* request,
-                                             PTransmitDataResult* response,
-                                             google::protobuf::Closure* done) {
-    VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
-             << " node=" << request->node_id();
+                                          const PTransmitDataParams* request,
+                                          PTransmitDataResult* response,
+                                          google::protobuf::Closure* done) {
+    // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+
+    _transmit_block(cntl_base, request, response, done, Status::OK());
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::transmit_block_by_http(google::protobuf::RpcController* cntl_base,
+                                                  const PEmptyRequest* request,
+                                                  PTransmitDataResult* response,
+                                                  google::protobuf::Closure* done) {
+    PTransmitDataParams* request_raw = new PTransmitDataParams();
+    google::protobuf::Closure* done_raw =
+            new NewHttpClosure<PTransmitDataParams>(request_raw, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = attachment_extract_request_contain_block<PTransmitDataParams>(request_raw, cntl);
+    _transmit_block(cntl_base, request_raw, response, done_raw, st);
+}
+
+template <typename T>
+void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* cntl_base,
+                                           const PTransmitDataParams* request,
+                                           PTransmitDataResult* response,
+                                           google::protobuf::Closure* done,
+                                           const Status& extract_st) {
+    std::string query_id;
+    if (request->has_query_id()) {
+        query_id = print_id(request->query_id());
+        TUniqueId finst_id;
+        finst_id.__set_hi(request->finst_id().hi());
+        finst_id.__set_lo(request->finst_id().lo());
+    }
+    VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id())
+             << " query_id=" << query_id << " node=" << request->node_id();
     // The response is accessed when done->Run is called in transmit_block(),
     // give response a default value to avoid null pointers in high concurrency.
     Status st;
     st.to_protobuf(response->mutable_status());
-    st = _exec_env->vstream_mgr()->transmit_block(request, &done);
-    if (!st.ok()) {
-        LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg()
-                     << ", fragment_instance_id=" << print_id(request->finst_id())
-                     << ", node=" << request->node_id();
+    if (extract_st.ok()) {
+        st = _exec_env->vstream_mgr()->transmit_block(request, &done);
+        if (!st.ok()) {
+            LOG(WARNING) << "transmit_block failed, message=" << st.get_error_msg()
+                         << ", fragment_instance_id=" << print_id(request->finst_id())
+                         << ", node=" << request->node_id();
+        }
+    } else {
+        st = extract_st;
     }
     if (done != nullptr) {
         st.to_protobuf(response->mutable_status());
diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h
index 550773b950..0ca63aa5f9 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -41,6 +41,11 @@ public:
                        ::doris::PTransmitDataResult* response,
                        ::google::protobuf::Closure* done) override;
 
+    void transmit_data_by_http(::google::protobuf::RpcController* controller,
+                               const ::doris::PEmptyRequest* request,
+                               ::doris::PTransmitDataResult* response,
+                               ::google::protobuf::Closure* done) override;
+
     void exec_plan_fragment(google::protobuf::RpcController* controller,
                             const PExecPlanFragmentRequest* request,
                             PExecPlanFragmentResult* result,
@@ -74,6 +79,11 @@ public:
                                  PTabletWriterAddBatchResult* response,
                                  google::protobuf::Closure* done) override;
 
+    void tablet_writer_add_batch_by_http(google::protobuf::RpcController* controller,
+                                         const ::doris::PEmptyRequest* request,
+                                         PTabletWriterAddBatchResult* response,
+                                         google::protobuf::Closure* done) override;
+
     void tablet_writer_cancel(google::protobuf::RpcController* controller,
                               const PTabletWriterCancelRequest* request,
                               PTabletWriterCancelResult* response,
@@ -105,6 +115,10 @@ public:
                         const ::doris::PTransmitDataParams* request,
                         ::doris::PTransmitDataResult* response,
                         ::google::protobuf::Closure* done) override;
+    void transmit_block_by_http(::google::protobuf::RpcController* controller,
+                                const ::doris::PEmptyRequest* request,
+                                ::doris::PTransmitDataResult* response,
+                                ::google::protobuf::Closure* done) override;
 
     void send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request,
                    PSendDataResult* response, google::protobuf::Closure* done) override;
@@ -132,6 +146,21 @@ private:
 
     Status _fold_constant_expr(const std::string& ser_request, PConstantExprResult* response);
 
+    void _transmit_data(::google::protobuf::RpcController* controller,
+                        const ::doris::PTransmitDataParams* request,
+                        ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
+                        const Status& extract_st);
+
+    void _transmit_block(::google::protobuf::RpcController* controller,
+                         const ::doris::PTransmitDataParams* request,
+                         ::doris::PTransmitDataResult* response, ::google::protobuf::Closure* done,
+                         const Status& extract_st);
+
+    void _tablet_writer_add_batch(google::protobuf::RpcController* controller,
+                                  const PTabletWriterAddBatchRequest* request,
+                                  PTabletWriterAddBatchResult* response,
+                                  google::protobuf::Closure* done);
+
 private:
     ExecEnv* _exec_env;
     PriorityThreadPool _tablet_worker_pool;
diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h
index b15a3c6aeb..aef96b6ec6 100644
--- a/be/src/util/brpc_client_cache.h
+++ b/be/src/util/brpc_client_cache.h
@@ -71,9 +71,23 @@ public:
         }
 
         // new one stub and insert into map
+        auto stub = get_new_client_no_cache(host_port);
+        _stub_map.try_emplace_l(
+                host_port, [&stub](typename StubMap<T>::mapped_type& v) { stub = v; }, stub);
+        return stub;
+    }
+
+    std::shared_ptr<T> get_new_client_no_cache(const std::string& host_port,
+                                               const std::string& protocol = "baidu_std",
+                                               const std::string& connect_type = "") {
         brpc::ChannelOptions options;
         if constexpr (std::is_same_v<T, PFunctionService_Stub>) {
             options.protocol = config::function_service_protocol;
+        } else {
+            options.protocol = protocol;
+        }
+        if (connect_type != "") {
+            options.connection_type = connect_type;
         }
         std::unique_ptr<brpc::Channel> channel(new brpc::Channel());
         int ret_code = 0;
@@ -86,12 +100,7 @@ public:
         if (ret_code) {
             return nullptr;
         }
-        auto stub = std::make_shared<T>(channel.release(),
-                                        google::protobuf::Service::STUB_OWNS_CHANNEL);
-        _stub_map.try_emplace_l(host_port,
-                                [&stub](typename StubMap<T>::mapped_type& v) { stub = v; },
-                                stub);
-        return stub;
+        return std::make_shared<T>(channel.release(), google::protobuf::Service::STUB_OWNS_CHANNEL);
     }
 
     inline size_t size() { return _stub_map.size(); }
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 52dcec1861..0144d87396 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -17,10 +17,16 @@
 
 #pragma once
 
-#include "util/stack_util.h"
-
 namespace doris {
 
+// When the tuple/block data is greater than 2G, embed the tuple/block data
+// and the request serialization string in the attachment, and use "http" brpc.
+// "http"brpc requires that only one of request and attachment be non-null.
+//
+// 2G: In the default "baidu_std" brpcd, upper limit of the request and attachment length is 2G.
+constexpr size_t MIN_HTTP_BRPC_SIZE = (1ULL << 31);
+
+// TODO(zxy) delete in v1.3 version
 // Transfer RowBatch in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -34,6 +40,7 @@ inline void request_row_batch_transfer_attachment(Params* brpc_request, const st
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Transfer Block in ProtoBuf Request to Controller Attachment.
 // This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
 // and it is expected that performance can be improved.
@@ -47,6 +54,7 @@ inline void request_block_transfer_attachment(Params* brpc_request, const std::s
     closure->cntl.request_attachment().swap(attachment);
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to RowBatch in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_row_batch(const Params* brpc_request, brpc::Controller* cntl) {
@@ -59,6 +67,7 @@ inline void attachment_transfer_request_row_batch(const Params* brpc_request, br
     }
 }
 
+// TODO(zxy) delete in v1.3 version
 // Controller Attachment transferred to Block in ProtoBuf Request.
 template <typename Params>
 inline void attachment_transfer_request_block(const Params* brpc_request, brpc::Controller* cntl) {
@@ -71,4 +80,101 @@ inline void attachment_transfer_request_block(const Params* brpc_request, brpc::
     }
 }
 
+// Embed tuple_data and brpc request serialization string in controller attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_tuple(Params* brpc_request, Closure* closure) {
+    auto row_batch = brpc_request->row_batch();
+    Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(), closure);
+    row_batch.set_tuple_data("");
+    return st;
+}
+
+// Embed column_values and brpc request serialization string in controller attachment.
+template <typename Params, typename Closure>
+inline Status request_embed_attachment_contain_block(Params* brpc_request, Closure* closure) {
+    auto block = brpc_request->block();
+    Status st = request_embed_attachment(brpc_request, block.column_values(), closure);
+    block.set_column_values("");
+    return st;
+}
+
+template <typename Params, typename Closure>
+inline Status request_embed_attachment(Params* brpc_request, const std::string& data,
+                                       Closure* closure) {
+    butil::IOBuf attachment;
+
+    // step1: serialize brpc_request to string, and append to attachment.
+    std::string req_str;
+    brpc_request->SerializeToString(&req_str);
+    int64_t req_str_size = req_str.size();
+    attachment.append(&req_str_size, sizeof(req_str_size));
+    attachment.append(req_str);
+
+    // step2: append data to attachment and put it in the closure.
+    int64_t data_size = data.size();
+    attachment.append(&data_size, sizeof(data_size));
+    try {
+        attachment.append(data);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for append data to attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("request embed attachment failed to memcpy {} bytes", data_size));
+    }
+    // step3: attachment add to closure.
+    closure->cntl.request_attachment().swap(attachment);
+    return Status::OK();
+}
+
+// Extract the brpc request and tuple data from the controller attachment,
+// and put the tuple data into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_tuple(const Params* brpc_request,
+                                                       brpc::Controller* cntl) {
+    Params* req = const_cast<Params*>(brpc_request);
+    auto rb = req->mutable_row_batch();
+    return attachment_extract_request(req, cntl, rb->mutable_tuple_data());
+}
+
+// Extract the brpc request and block from the controller attachment,
+// and put the block into the request.
+template <typename Params>
+inline Status attachment_extract_request_contain_block(const Params* brpc_request,
+                                                       brpc::Controller* cntl) {
+    Params* req = const_cast<Params*>(brpc_request);
+    auto block = req->mutable_block();
+    return attachment_extract_request(req, cntl, block->mutable_column_values());
+}
+
+template <typename Params>
+inline Status attachment_extract_request(const Params* brpc_request, brpc::Controller* cntl,
+                                         std::string* data) {
+    const butil::IOBuf& io_buf = cntl->request_attachment();
+
+    // step1: deserialize request string to brpc_request from attachment.
+    int64_t req_str_size;
+    io_buf.copy_to(&req_str_size, sizeof(req_str_size), 0);
+    std::string req_str;
+    io_buf.copy_to(&req_str, req_str_size, sizeof(req_str_size));
+    Params* req = const_cast<Params*>(brpc_request);
+    req->ParseFromString(req_str);
+
+    // step2: extract data from attachment.
+    int64_t data_size;
+    io_buf.copy_to(&data_size, sizeof(data_size), sizeof(req_str_size) + req_str_size);
+    try {
+        io_buf.copy_to(data, data_size, sizeof(data_size) + sizeof(req_str_size) + req_str_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        LOG(WARNING) << "Try to alloc " << data_size
+                     << " bytes for extract data from attachment failed. "
+                     << (p ? p.__cxa_exception_type()->name() : "null");
+        return Status::MemoryAllocFailed(
+                fmt::format("attachment extract request failed to memcpy {} bytes", data_size));
+    }
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 0d275f6604..bcc06b51b1 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -721,7 +721,7 @@ Status Block::filter_block(Block* block, int filter_column_id, int column_to_kee
 }
 
 Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
-                        std::string* allocated_buf) const {
+                        bool allow_transfer_large_data) const {
     // calc uncompressed size for allocation
     size_t content_uncompressed_size = 0;
     for (const auto& c : *this) {
@@ -733,8 +733,20 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
 
     // serialize data values
     // when data type is HLL, content_uncompressed_size maybe larger than real size.
-    allocated_buf->resize(content_uncompressed_size);
-    char* buf = allocated_buf->data();
+    std::string* column_values = nullptr;
+    try {
+        column_values = pblock->mutable_column_values();
+        column_values->resize(content_uncompressed_size);
+    } catch (...) {
+        std::exception_ptr p = std::current_exception();
+        std::string msg = fmt::format(
+                "Try to alloc {} bytes for pblock column values failed. reason {}",
+                content_uncompressed_size, p ? p.__cxa_exception_type()->name() : "null");
+        LOG(WARNING) << msg;
+        return Status::BufferAllocFailed(msg);
+    }
+    char* buf = column_values->data();
+
     for (const auto& c : *this) {
         buf = c.type->serialize(*(c.column), buf);
     }
@@ -750,12 +762,12 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
 
         size_t compressed_size = 0;
         char* compressed_output = compression_scratch.data();
-        snappy::RawCompress(allocated_buf->data(), content_uncompressed_size, compressed_output,
+        snappy::RawCompress(column_values->data(), content_uncompressed_size, compressed_output,
                             &compressed_size);
 
         if (LIKELY(compressed_size < content_uncompressed_size)) {
             compression_scratch.resize(compressed_size);
-            allocated_buf->swap(compression_scratch);
+            column_values->swap(compression_scratch);
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
@@ -765,7 +777,10 @@ Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* comp
         VLOG_ROW << "uncompressed size: " << content_uncompressed_size
                  << ", compressed size: " << compressed_size;
     }
-
+    if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) {
+        return Status::InternalError(fmt::format(
+                "The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 9bc86c4533..31b3602a8b 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -248,7 +248,8 @@ public:
     }
 
     // serialize block to PBlock
-    Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes, std::string* allocated_buf) const;
+    Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* compressed_bytes,
+                     bool allow_transfer_large_data = false) const;
 
     // serialize block to PRowbatch
     void serialize(RowBatch*, const RowDescriptor&);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index cbadfc931f..0f1cf57c7f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -50,6 +50,11 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
     _finst_id.set_hi(_fragment_instance_id.hi);
     _finst_id.set_lo(_fragment_instance_id.lo);
     _brpc_request.set_allocated_finst_id(&_finst_id);
+
+    _query_id.set_hi(state->query_id().hi);
+    _query_id.set_lo(state->query_id().lo);
+    _brpc_request.set_allocated_query_id(&_query_id);
+
     _brpc_request.set_node_id(_dest_node_id);
     _brpc_request.set_sender_id(_parent->_sender_id);
     _brpc_request.set_be_number(_be_number);
@@ -67,6 +72,7 @@ Status VDataStreamSender::Channel::init(RuntimeState* state) {
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
     _need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo != -1);
+    _state = state;
     return Status::OK();
 }
 
@@ -138,13 +144,27 @@ Status VDataStreamSender::Channel::send_block(PBlock* block, bool eos) {
     _closure->ref();
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
-    if (_brpc_request.has_block()) {
-        request_block_transfer_attachment<PTransmitDataParams,
-            RefCountClosure<PTransmitDataResult>>(&_brpc_request, _parent->_column_values_buffer,
-                    _closure);
+    if (_parent->_transfer_large_data_by_brpc && _brpc_request.has_block() &&
+        _brpc_request.block().has_column_values() &&
+        _brpc_request.ByteSizeLong() > MIN_HTTP_BRPC_SIZE) {
+        Status st = request_embed_attachment_contain_block<PTransmitDataParams,
+                                                           RefCountClosure<PTransmitDataResult>>(
+                &_brpc_request, _closure);
+        RETURN_IF_ERROR(st);
+        std::string brpc_url =
+                fmt::format("http://{}:{}", _brpc_dest_addr.hostname, _brpc_dest_addr.port);
+        std::shared_ptr<PBackendService_Stub> _brpc_http_stub =
+                _state->exec_env()->brpc_internal_client_cache()->get_new_client_no_cache(brpc_url,
+                                                                                          "http");
+        _closure->cntl.http_request().uri() =
+                brpc_url + "/PInternalServiceImpl/transmit_block_by_http";
+        _closure->cntl.http_request().set_method(brpc::HTTP_METHOD_POST);
+        _closure->cntl.http_request().set_content_type("application/json");
+        _brpc_http_stub->transmit_block_by_http(&_closure->cntl, NULL, &_closure->result, _closure);
+    } else {
+        _closure->cntl.http_request().Clear();
+        _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     }
-
-    _brpc_stub->transmit_block(&_closure->cntl, &_brpc_request, &_closure->result, _closure);
     if (block != nullptr) {
         _brpc_request.release_block();
     }
@@ -259,7 +279,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
-          _dest_node_id(sink.dest_node_id) {
+          _dest_node_id(sink.dest_node_id),
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -563,7 +584,8 @@ Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int num_rece
         SCOPED_TIMER(_serialize_batch_timer);
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes, &_column_values_buffer));
+        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, &compressed_bytes,
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * num_receivers);
     }
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 67faae452b..09c9549a5b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -144,10 +144,8 @@ protected:
     // Identifier of the destination plan node.
     PlanNodeId _dest_node_id;
 
-    // This buffer is used to store the serialized block data
-    // The data in the buffer is copied to the attachment of the brpc when it is sent,
-    // to avoid an extra pb serialization in the brpc.
-    std::string _column_values_buffer;
+    // User can change this config at runtime, avoid it being modified during query or loading process.
+    bool _transfer_large_data_by_brpc = false;
 };
 
 // TODO: support local exechange
@@ -188,6 +186,7 @@ public:
         }
         // release this before request desctruct
         _brpc_request.release_finst_id();
+        _brpc_request.release_query_id();
     }
 
     // Initialize channel.
@@ -279,6 +278,7 @@ private:
     TNetworkAddress _brpc_dest_addr;
 
     PUniqueId _finst_id;
+    PUniqueId _query_id;
     PBlock _pb_block;
     PTransmitDataParams _brpc_request;
     std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
@@ -287,6 +287,7 @@ private:
     // whether the dest can be treated as query statistics transfer chain.
     bool _is_transfer_chain;
     bool _send_query_statistics_with_every_batch;
+    RuntimeState* _state;
 
     size_t _capacity;
     bool _is_local;
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 6de189099e..b3c948688c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/sink/vtablet_sink.h"
 
+#include "util/brpc_client_cache.h"
+#include "util/debug/sanitizer_scopes.h"
 #include "util/doris_metrics.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 3d8366d1d3..a6f4fef966 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -157,14 +157,13 @@ TEST(BlockTest, RowBatchCovertToBlock) {
 void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
     size_t uncompressed_bytes = 0;
     size_t compressed_bytes = 0;
-    std::string column_values_buffer;
-    Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes, &column_values_buffer);
+    Status st = block.serialize(pblock, &uncompressed_bytes, &compressed_bytes);
     EXPECT_TRUE(st.ok());
     EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
-    EXPECT_EQ(compressed_bytes, column_values_buffer.size());
-    pblock->set_column_values(column_values_buffer);
-    
-    const vectorized::ColumnWithTypeAndName& type_and_name = block.get_columns_with_type_and_name()[0];
+    EXPECT_EQ(compressed_bytes, pblock->column_values().size());
+
+    const vectorized::ColumnWithTypeAndName& type_and_name =
+            block.get_columns_with_type_and_name()[0];
     EXPECT_EQ(type_and_name.name, pblock->column_metas()[0].name());
 }
 
diff --git a/docs/en/administrator-guide/config/be_config.md b/docs/en/administrator-guide/config/be_config.md
index b1d9f9c5eb..8afd14f61a 100644
--- a/docs/en/administrator-guide/config/be_config.md
+++ b/docs/en/administrator-guide/config/be_config.md
@@ -186,11 +186,11 @@ This configuration is mainly used to modify the parameter `socket_max_unwritten_
 
 Sometimes the query fails and an error message of `The server is overcrowded` will appear in the BE log. This means there are too many messages to buffer at the sender side, which may happen when the SQL needs to send large bitmap value. You can avoid this error by increasing the configuration.
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * Type: bool
-* Description: This configuration is used to control whether to transfer the RowBatch in the ProtoBuf Request to the Controller Attachment and then send it through brpc. When the length of ProtoBuf Request exceeds 2G, an error will be reported: Bad request, error_text=[E1003]Fail to compress request, Putting RowBatch in Controller Attachment will be faster and avoid this error.
-* Default value: false
+* Description: This configuration is used to control whether to serialize the protoBuf request and embed the Tuple/Block data into the controller attachment and send it through http brpc when the length of the Tuple/Block data is greater than 1.8G. To avoid errors when the length of the protoBuf request exceeds 2G: Bad request, error_text=[E1003]Fail to compress request. In the past version, after putting Tuple/Block data in the attachment, it was sent through the default baidu_std brpc, [...]
+* Default value: true
 
 ### `brpc_num_threads`
 
diff --git a/docs/zh-CN/administrator-guide/config/be_config.md b/docs/zh-CN/administrator-guide/config/be_config.md
index 1dd7e7a015..abbfc3b20b 100644
--- a/docs/zh-CN/administrator-guide/config/be_config.md
+++ b/docs/zh-CN/administrator-guide/config/be_config.md
@@ -179,11 +179,11 @@ Metrics: {"filtered_rows":0,"input_row_num":3346807,"input_rowsets_count":42,"in
 
 有时查询失败,BE 日志中会出现 `The server is overcrowded` 的错误信息,表示连接上有过多的未发送数据。当查询需要发送较大的bitmap字段时,可能会遇到该问题,此时可能通过调大该配置避免该错误。
 
-### `transfer_data_by_brpc_attachment`
+### `transfer_large_data_by_brpc`
 
 * 类型: bool
-* 描述:该配置用来控制是否将ProtoBuf Request中的RowBatch转移到Controller Attachment后通过brpc发送。ProtoBuf Request的长度超过2G时会报错: Bad request, error_text=[E1003]Fail to compress request,将RowBatch放到Controller Attachment中将更快且避免这个错误。
-* 默认值:false
+* 描述:该配置用来控制是否在 Tuple/Block data 长度大于1.8G时,将 protoBuf request 序列化后和 Tuple/Block data 一起嵌入到 controller attachment 后通过 http brpc 发送。为了避免 protoBuf request 的长度超过2G时的错误:Bad request, error_text=[E1003]Fail to compress request。在过去的版本中,曾将 Tuple/Block data 放入 attachment 后通过默认的 baidu_std brpc 发送,但 attachment 超过2G时将被截断,通过 http brpc 发送不存在2G的限制。
+* 默认值:true
 
 ### `brpc_num_threads`
 
diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto
index 45662c10cd..9ce6194ddf 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -46,6 +46,7 @@ message PTransmitDataParams {
     optional PBlock block = 9;
     // transfer the RowBatch to the Controller Attachment
     optional bool transfer_by_attachment = 10 [default = false];
+    optional PUniqueId query_id = 11;
 };
 
 message PTransmitDataResult {
@@ -431,8 +432,11 @@ message PResetRPCChannelResponse {
     repeated string channels = 2;
 };
 
+message PEmptyRequest {};
+
 service PBackendService {
     rpc transmit_data(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_data_by_http(PEmptyRequest) returns (PTransmitDataResult);
     // If #fragments of a query is < 3, use exec_plan_fragment directly.
     // If #fragments of a query is >=3, use exec_plan_fragment_prepare + exec_plan_fragment_start
     rpc exec_plan_fragment(PExecPlanFragmentRequest) returns (PExecPlanFragmentResult);
@@ -442,6 +446,7 @@ service PBackendService {
     rpc fetch_data(PFetchDataRequest) returns (PFetchDataResult);
     rpc tablet_writer_open(PTabletWriterOpenRequest) returns (PTabletWriterOpenResult);
     rpc tablet_writer_add_batch(PTabletWriterAddBatchRequest) returns (PTabletWriterAddBatchResult);
+    rpc tablet_writer_add_batch_by_http(PEmptyRequest) returns (PTabletWriterAddBatchResult);
     rpc tablet_writer_cancel(PTabletWriterCancelRequest) returns (PTabletWriterCancelResult);
     rpc get_info(PProxyRequest) returns (PProxyResult); 
     rpc update_cache(PUpdateCacheRequest) returns (PCacheResponse);
@@ -454,6 +459,7 @@ service PBackendService {
     rpc apply_filter(PPublishFilterRequest) returns (PPublishFilterResponse);
     rpc fold_constant_expr(PConstantExprRequest) returns (PConstantExprResult);
     rpc transmit_block(PTransmitDataParams) returns (PTransmitDataResult);
+    rpc transmit_block_by_http(PEmptyRequest) returns (PTransmitDataResult);
     rpc check_rpc_channel(PCheckRPCChannelRequest) returns (PCheckRPCChannelResponse);
     rpc reset_rpc_channel(PResetRPCChannelRequest) returns (PResetRPCChannelResponse);
     rpc hand_shake(PHandShakeRequest) returns (PHandShakeResponse);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 02/04: [Bug] Fix bug push value predicate of unique table when have sequence column (#10060)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 6383b60b5a6d022bf4e453c0bfbf4e451a529c02
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue Jun 14 15:35:31 2022 +0800

    [Bug] Fix bug push value predicate of unique table when have sequence column (#10060)
    
    Co-authored-by: lihaopeng <li...@baidu.com>
---
 be/src/olap/rowset/beta_rowset_reader.cpp | 14 ++++++++++----
 be/src/olap/rowset/beta_rowset_reader.h   |  2 ++
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index 3148656dd6..89b150dca9 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -79,10 +79,8 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
                                               read_context->predicates->begin(),
                                               read_context->predicates->end());
     }
-    // if unique table with rowset [0-x] or [0-1] [2-y] [...],
-    // value column predicates can be pushdown on rowset [0-x] or [2-y]
-    if (_rowset->keys_type() == UNIQUE_KEYS &&
-        (_rowset->start_version() == 0 || _rowset->start_version() == 2)) {
+
+    if (_should_push_down_value_predicates()) {
         if (read_context->value_predicates != nullptr) {
             read_options.column_predicates.insert(read_options.column_predicates.end(),
                                                   read_context->value_predicates->begin(),
@@ -240,4 +238,12 @@ OLAPStatus BetaRowsetReader::next_block(vectorized::Block* block) {
     return OLAP_SUCCESS;
 }
 
+bool BetaRowsetReader::_should_push_down_value_predicates() const {
+    // if unique table with rowset [0-x] or [0-1] [2-y] [...],
+    // value column predicates can be pushdown on rowset [0-x] or [2-y], [2-y] must be compaction and not overlapping
+    return _rowset->keys_type() == UNIQUE_KEYS &&
+           (_rowset->start_version() == 0 || _rowset->start_version() == 2) &&
+           !_rowset->_rowset_meta->is_segments_overlapping();
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h
index 55a8938dbf..919e13cb7a 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -56,6 +56,8 @@ public:
     RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
 
 private:
+    bool _should_push_down_value_predicates() const;
+
     std::unique_ptr<Schema> _schema;
     RowsetReaderContext* _context;
     BetaRowsetSharedPtr _rowset;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 04/04: [BUGFIX] Fix wrong column types in result file sink (#10079)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit bdfd1d091f281c41ebb62be527f57bafd6649073
Author: Gabriel <ga...@gmail.com>
AuthorDate: Mon Jun 13 09:05:11 2022 +0800

    [BUGFIX] Fix wrong column types in result file sink (#10079)
---
 be/src/vec/runtime/vfile_result_writer.cpp | 17 +----------------
 1 file changed, 1 insertion(+), 16 deletions(-)

diff --git a/be/src/vec/runtime/vfile_result_writer.cpp b/be/src/vec/runtime/vfile_result_writer.cpp
index 8d70baa854..71a748e565 100644
--- a/be/src/vec/runtime/vfile_result_writer.cpp
+++ b/be/src/vec/runtime/vfile_result_writer.cpp
@@ -423,25 +423,10 @@ Status VFileResultWriter::_fill_result_block() {
             INSERT_TO_COLUMN;
             break;
         }
-        case TYPE_LARGEINT: {
-            auto column = ColumnVector<int128_t>::create();
-            INSERT_TO_COLUMN;
-            break;
-        }
-        case TYPE_SMALLINT: {
-            auto column = ColumnVector<int16_t>::create();
-            INSERT_TO_COLUMN;
-            break;
-        }
-        case TYPE_TINYINT: {
-            auto column = ColumnVector<int8_t>::create();
-            INSERT_TO_COLUMN;
-            break;
-        }
         case TYPE_VARCHAR:
         case TYPE_CHAR:
         case TYPE_STRING: {
-            auto column = ColumnVector<int8_t>::create();
+            auto column = ColumnString::create();
             INSERT_TO_COLUMN;
             break;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[incubator-doris] 03/04: [fix] fix bug that show proc "/cluster_balance/history_tablets" return malformat error (#10073)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 1f17a2b9ae7b0f5b6463f0967818042dddb8f955
Author: Mingyu Chen <mo...@gmail.com>
AuthorDate: Tue Jun 14 15:34:16 2022 +0800

    [fix] fix bug that show proc "/cluster_balance/history_tablets" return malformat error (#10073)
---
 .../common/proc/TabletSchedulerDetailProcDir.java  | 11 ++--
 .../org/apache/doris/planner/HashJoinNode.java     | 22 ++++----
 .../org/apache/doris/planner/OlapScanNode.java     | 58 +++++++---------------
 .../org/apache/doris/planner/PlanFragment.java     |  5 +-
 .../org/apache/doris/planner/ColocatePlanTest.java |  2 +-
 .../doris/planner/DistributedPlannerTest.java      |  8 +--
 .../planner/MaterializedViewFunctionTest.java      | 16 +++---
 .../apache/doris/planner/ResourceTagQueryTest.java |  6 +--
 .../doris/planner/TableFunctionPlanTest.java       |  2 +-
 .../data/performance/redundant_conjuncts.out       | 28 +++--------
 10 files changed, 63 insertions(+), 95 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
index 161cbc27c2..b050aa7db3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/TabletSchedulerDetailProcDir.java
@@ -34,12 +34,11 @@ import java.util.List;
  * show proc "/tablet_scheduler/history_tablets";
  */
 public class TabletSchedulerDetailProcDir implements ProcDirInterface {
-    public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>()
-            .add("TabletId").add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio")
-            .add("SrcBe").add("SrcPath").add("DestBe").add("DestPath").add("Timeout")
-            .add("Create").add("LstSched").add("LstVisit").add("Finished").add("Rate").add("FailedSched")
-            .add("FailedRunning").add("LstAdjPrio").add("VisibleVer").add("VisibleVerHash")
-            .add("CmtVer").add("CmtVerHash").add("ErrMsg")
+    public static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("TabletId")
+            .add("Type").add("Medium").add("Status").add("State").add("OrigPrio").add("DynmPrio").add("SrcBe")
+            .add("SrcPath").add("DestBe").add("DestPath").add("Timeout").add("Create").add("LstSched").add("LstVisit")
+            .add("Finished").add("Rate").add("FailedSched").add("FailedRunning").add("LstAdjPrio").add("VisibleVer")
+            .add("CmtVer").add("ErrMsg")
             .build();
     
     private String type;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 5e6fa40fad..4f1411ce64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -682,23 +682,26 @@ public class HashJoinNode extends PlanNode {
 
     @Override
     public String getNodeExplainString(String detailPrefix, TExplainLevel detailLevel) {
-        String distrModeStr =
-                (distrMode != DistributionMode.NONE) ? (" (" + distrMode.toString() + ")") : "";
-        StringBuilder output = new StringBuilder()
-                .append(detailPrefix).append("join op: ").append(joinOp.toString()).append(distrModeStr).append("\n");
+        String distrModeStr = "";
+        if (isColocate) {
+            distrModeStr = "COLOCATE[" + colocateReason + "]";
+        } else {
+            distrModeStr = distrMode.toString();
+        }
+        StringBuilder output =
+                new StringBuilder().append(detailPrefix).append("join op: ").append(joinOp.toString()).append("(")
+                        .append(distrModeStr).append(")").append("[").append(colocateReason).append("]\n");
 
         if (detailLevel == TExplainLevel.BRIEF) {
             return output.toString();
         }
 
-        output.append(detailPrefix).append("hash predicates:\n")
-                .append(detailPrefix).append("colocate: ").append(isColocate).append(isColocate ? "" : ", reason: " + colocateReason).append("\n");
-
         for (BinaryPredicate eqJoinPredicate : eqJoinConjuncts) {
             output.append(detailPrefix).append("equal join conjunct: ").append(eqJoinPredicate.toSql()).append("\n");
         }
         if (!otherJoinConjuncts.isEmpty()) {
-            output.append(detailPrefix).append("other join predicates: ").append(getExplainString(otherJoinConjuncts)).append("\n");
+            output.append(detailPrefix).append("other join predicates: ").append(getExplainString(otherJoinConjuncts))
+                    .append("\n");
         }
         if (!conjuncts.isEmpty()) {
             output.append(detailPrefix).append("other predicates: ").append(getExplainString(conjuncts)).append("\n");
@@ -707,8 +710,7 @@ public class HashJoinNode extends PlanNode {
             output.append(detailPrefix).append("runtime filters: ");
             output.append(getRuntimeFilterExplainString(true));
         }
-        output.append(detailPrefix).append(String.format(
-                "cardinality=%s", cardinality)).append("\n");
+        output.append(detailPrefix).append(String.format("cardinality=%s", cardinality)).append("\n");
         // todo unify in plan node
         if (outputSlotIds != null) {
             output.append(detailPrefix).append("output slot ids: ");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 7cc79ab534..6d33efbbeb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -687,63 +687,43 @@ public class OlapScanNode extends ScanNode {
     public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
         StringBuilder output = new StringBuilder();
 
-        output.append(prefix).append("TABLE: ").append(olapTable.getName()).append("\n");
-
+        String indexName = olapTable.getIndexNameById(selectedIndexId);
+        output.append(prefix).append("TABLE: ").append(olapTable.getName()).append("(").append(indexName).append(")");
         if (detailLevel == TExplainLevel.BRIEF) {
             return output.toString();
         }
+        if (isPreAggregation) {
+            output.append(", PREAGGREGATION: ON");
+        } else {
+            output.append(", PREAGGREGATION: OFF. Reason: ").append(reasonOfPreAggregation);
+        }
+        output.append("\n");
 
         if (null != sortColumn) {
             output.append(prefix).append("SORT COLUMN: ").append(sortColumn).append("\n");
         }
-        if (isPreAggregation) {
-            output.append(prefix).append("PREAGGREGATION: ON").append("\n");
-        } else {
-            output.append(prefix).append("PREAGGREGATION: OFF. Reason: ").append(reasonOfPreAggregation).append("\n");
-        }
+
         if (!conjuncts.isEmpty()) {
-            output.append(prefix).append("PREDICATES: ").append(
-                    getExplainString(conjuncts)).append("\n");
+            output.append(prefix).append("PREDICATES: ").append(getExplainString(conjuncts)).append("\n");
         }
         if (!runtimeFilters.isEmpty()) {
             output.append(prefix).append("runtime filters: ");
             output.append(getRuntimeFilterExplainString(false));
         }
 
-        output.append(prefix).append(String.format(
-                "partitions=%s/%s",
-                selectedPartitionNum,
-                olapTable.getPartitions().size()));
-
-        String indexName = olapTable.getIndexNameById(selectedIndexId);
-        output.append("\n").append(prefix).append(String.format("rollup: %s", indexName));
-
-        output.append("\n");
-
-        output.append(prefix).append(String.format(
-                "tabletRatio=%s/%s", selectedTabletsNum, totalTabletsNum));
-        output.append("\n");
-
-        // We print up to 10 tablet, and we print "..." if the number is more than 10
-        if (scanTabletIds.size() > 10) {
-            List<Long> firstTenTabletIds = scanTabletIds.subList(0, 10);
-            output.append(prefix).append(String.format("tabletList=%s ...", Joiner.on(",").join(firstTenTabletIds)));
+        output.append(prefix).append(String.format("partitions=%s/%s, tablets=%s/%s", selectedPartitionNum,
+                olapTable.getPartitions().size(), selectedTabletsNum, totalTabletsNum));
+        // We print up to 3 tablet, and we print "..." if the number is more than 3
+        if (scanTabletIds.size() > 3) {
+            List<Long> firstTenTabletIds = scanTabletIds.subList(0, 3);
+            output.append(String.format(", tabletList=%s ...", Joiner.on(",").join(firstTenTabletIds)));
         } else {
-            output.append(prefix).append(String.format("tabletList=%s", Joiner.on(",").join(scanTabletIds)));
+            output.append(String.format(", tabletList=%s", Joiner.on(",").join(scanTabletIds)));
         }
-
-        output.append("\n");
-
-        output.append(prefix).append(String.format(
-                "cardinality=%s", cardinality));
-        output.append("\n");
-
-        output.append(prefix).append(String.format(
-                "avgRowSize=%s", avgRowSize));
         output.append("\n");
 
-        output.append(prefix).append(String.format(
-                "numNodes=%s", numNodes));
+        output.append(prefix).append(String.format("cardinality=%s", cardinality))
+                .append(String.format(", avgRowSize=%s", avgRowSize)).append(String.format(", numNodes=%s", numNodes));
         output.append("\n");
 
         return output.toString();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index c5d01e6249..1847a6b857 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -285,10 +285,9 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public String getExplainString(TExplainLevel explainLevel) {
         StringBuilder str = new StringBuilder();
         Preconditions.checkState(dataPartition != null);
-        str.append(" OUTPUT EXPRS:");
         if (CollectionUtils.isNotEmpty(outputExprs)) {
-            str.append(outputExprs.stream().map(Expr::toSql)
-                    .collect(Collectors.joining(" | ")));
+            str.append("  OUTPUT EXPRS:");
+            str.append(outputExprs.stream().map(Expr::toSql).collect(Collectors.joining(" | ")));
         }
         str.append("\n");
         str.append("  PARTITION: " + dataPartition.getExplainString(explainLevel) + "\n");
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
index 716e3c68be..c944dd6a94 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ColocatePlanTest.java
@@ -40,7 +40,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class ColocatePlanTest {
-    private static final String COLOCATE_ENABLE = "colocate: true";
+    public static final String COLOCATE_ENABLE = "COLOCATE";
     private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/";
     private static ConnectContext ctx;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
index 94b68b295d..ee5ad95e6a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/DistributedPlannerTest.java
@@ -137,7 +137,7 @@ public class DistributedPlannerTest {
         Planner planner = stmtExecutor.planner();
         List<PlanFragment> fragments = planner.getFragments();
         String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
-        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (BROADCAST)"));
+        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)"));
 
         sql = "explain select * from db1.tbl1 join [SHUFFLE] db1.tbl2 on tbl1.k1 = tbl2.k3";
         stmtExecutor = new StmtExecutor(ctx, sql);
@@ -145,7 +145,7 @@ public class DistributedPlannerTest {
         planner = stmtExecutor.planner();
         fragments = planner.getFragments();
         plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
-        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (PARTITIONED)"));
+        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)"));
     }
 
     @Test
@@ -156,7 +156,7 @@ public class DistributedPlannerTest {
         Planner planner = stmtExecutor.planner();
         List<PlanFragment> fragments = planner.getFragments();
         String plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
-        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (BROADCAST)"));
+        Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(BROADCAST)"));
 
         double originThreshold = ctx.getSessionVariable().autoBroadcastJoinThreshold;
         try {
@@ -166,7 +166,7 @@ public class DistributedPlannerTest {
             planner = stmtExecutor.planner();
             fragments = planner.getFragments();
             plan = planner.getExplainString(fragments, new ExplainOptions(false, false));
-            Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN (PARTITIONED)"));
+            Assert.assertEquals(1, StringUtils.countMatches(plan, "INNER JOIN(PARTITIONED)"));
         } finally {
             ctx.getSessionVariable().autoBroadcastJoinThreshold = originThreshold;
         }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
index 1e34a47b84..68a4eedcd1 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/MaterializedViewFunctionTest.java
@@ -34,21 +34,21 @@ import java.util.UUID;
 
 public class MaterializedViewFunctionTest {
     private static String baseDir = "fe";
-    private static String runningDir = baseDir + "/mocked/MaterializedViewFunctionTest/"
-            + UUID.randomUUID().toString() + "/";
+    private static String runningDir =
+            baseDir + "/mocked/MaterializedViewFunctionTest/" + UUID.randomUUID().toString() + "/";
     private static final String EMPS_TABLE_NAME = "emps";
     private static final String EMPS_MV_NAME = "emps_mv";
     private static final String HR_DB_NAME = "db1";
-    private static final String QUERY_USE_EMPS_MV = "rollup: " + EMPS_MV_NAME;
-    private static final String QUERY_USE_EMPS = "rollup: " + EMPS_TABLE_NAME;
+    private static final String QUERY_USE_EMPS_MV = "(" + EMPS_MV_NAME + ")";
+    private static final String QUERY_USE_EMPS = "(" + EMPS_TABLE_NAME + ")";
     private static final String DEPTS_TABLE_NAME = "depts";
     private static final String DEPTS_MV_NAME = "depts_mv";
-    private static final String QUERY_USE_DEPTS_MV = "rollup: " + DEPTS_MV_NAME;
-    private static final String QUERY_USE_DEPTS = "rollup: " + DEPTS_TABLE_NAME;
+    private static final String QUERY_USE_DEPTS_MV = "(" + DEPTS_MV_NAME + ")";
+    private static final String QUERY_USE_DEPTS = "(" + DEPTS_TABLE_NAME + ")";
     private static final String USER_TAG_TABLE_NAME = "user_tags";
     private static final String USER_TAG_MV_NAME = "user_tags_mv";
-    private static final String QUERY_USE_USER_TAG_MV = "rollup: " + USER_TAG_MV_NAME;
-    private static final String QUERY_USE_USER_TAG = "rollup: " + USER_TAG_TABLE_NAME;
+    private static final String QUERY_USE_USER_TAG_MV = "(" + USER_TAG_MV_NAME + ")";
+    private static final String QUERY_USE_USER_TAG = "(" + USER_TAG_TABLE_NAME + ")";
     private static final String TEST_TABLE_NAME = "test_tb";
     private static DorisAssert dorisAssert;
 
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
index e43c672c93..047e5a8258 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/ResourceTagQueryTest.java
@@ -209,7 +209,7 @@ public class ResourceTagQueryTest {
         String queryStr = "explain select * from test.tbl1";
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         System.out.println(explainString);
-        Assert.assertTrue(explainString.contains("tabletRatio=30/30"));
+        Assert.assertTrue(explainString.contains("tablets=30/30"));
 
         // set zone1 tag for root
         String setPropStr2 = "set property for 'root' 'resource_tags.location' = 'zone1';";
@@ -247,7 +247,7 @@ public class ResourceTagQueryTest {
         queryStr = "explain select * from test.tbl1";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         System.out.println(explainString);
-        Assert.assertTrue(explainString.contains("tabletRatio=30/30"));
+        Assert.assertTrue(explainString.contains("tablets=30/30"));
 
         // for now, 3 backends with tag zone1, 2 with tag default, so table is not stable.
         ExceptionChecker.expectThrows(UserException.class, () -> tbl.checkReplicaAllocation());
@@ -270,7 +270,7 @@ public class ResourceTagQueryTest {
         queryStr = "explain select * from test.tbl1";
         explainString = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr);
         System.out.println(explainString);
-        Assert.assertTrue(explainString.contains("tabletRatio=30/30"));
+        Assert.assertTrue(explainString.contains("tablets=30/30"));
 
         // set user exec mem limit
         String setExecMemLimitStr = "set property for 'root' 'exec_mem_limit' = '1000000';";
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
index 0480791baf..56d020c6c7 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/TableFunctionPlanTest.java
@@ -497,7 +497,7 @@ public class TableFunctionPlanTest {
     public void testLateralViewWithWhere() throws Exception {
         String sql = "select k1,e1 from db1.table_for_view lateral view explode_split(k3,',') tmp as e1 where k1 in (select k2 from db1.table_for_view);";
         String explainString = UtFrameUtils.getSQLPlanOrErrorMsg(ctx, sql, true);
-        Assert.assertTrue(explainString.contains("join op: LEFT SEMI JOIN (BROADCAST)"));
+        Assert.assertTrue(explainString.contains("join op: LEFT SEMI JOIN(BROADCAST)"));
         Assert.assertTrue(explainString.contains("equal join conjunct: `k1` = `k2`"));
         Assert.assertTrue(!explainString.contains("equal join conjunct: `k2` = `k2`"));
     }
diff --git a/regression-test/data/performance/redundant_conjuncts.out b/regression-test/data/performance/redundant_conjuncts.out
index ab2eadd2ea..af59089638 100644
--- a/regression-test/data/performance/redundant_conjuncts.out
+++ b/regression-test/data/performance/redundant_conjuncts.out
@@ -1,39 +1,27 @@
 -- This file is automatically generated. You should know what you did if you want to edit this
 -- !redundant_conjuncts --
 PLAN FRAGMENT 0
- OUTPUT EXPRS:`v1`
+  OUTPUT EXPRS:`v1`
   PARTITION: HASH_PARTITIONED: `default_cluster:regression_test`.`redundant_conjuncts`.`k1`
 
   VRESULT SINK
 
   0:VOlapScanNode
-     TABLE: redundant_conjuncts
-     PREAGGREGATION: OFF. Reason: No AggregateInfo
+     TABLE: redundant_conjuncts(null), PREAGGREGATION: OFF. Reason: No AggregateInfo
      PREDICATES: `k1` = 1
-     partitions=0/1
-     rollup: null
-     tabletRatio=0/0
-     tabletList=
-     cardinality=0
-     avgRowSize=8.0
-     numNodes=1
+     partitions=0/1, tablets=0/0, tabletList=
+     cardinality=0, avgRowSize=8.0, numNodes=1
 
 -- !redundant_conjuncts_gnerated_by_extract_common_filter --
 PLAN FRAGMENT 0
- OUTPUT EXPRS:`v1`
+  OUTPUT EXPRS:`v1`
   PARTITION: HASH_PARTITIONED: `default_cluster:regression_test`.`redundant_conjuncts`.`k1`
 
   VRESULT SINK
 
   0:VOlapScanNode
-     TABLE: redundant_conjuncts
-     PREAGGREGATION: OFF. Reason: No AggregateInfo
+     TABLE: redundant_conjuncts(null), PREAGGREGATION: OFF. Reason: No AggregateInfo
      PREDICATES: (`k1` = 1 OR `k1` = 2)
-     partitions=0/1
-     rollup: null
-     tabletRatio=0/0
-     tabletList=
-     cardinality=0
-     avgRowSize=8.0
-     numNodes=1
+     partitions=0/1, tablets=0/0, tabletList=
+     cardinality=0, avgRowSize=8.0, numNodes=1
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org