You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/20 04:07:46 UTC

[doris] branch master updated: [profile](datasender) add more detail profile in data stream sender (#15176)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a2d56af7d9 [profile](datasender) add more detail profile in data stream sender (#15176)
a2d56af7d9 is described below

commit a2d56af7d9cbaec75178ce2c9bf50c27f546749f
Author: yiguolei <67...@qq.com>
AuthorDate: Tue Dec 20 12:07:34 2022 +0800

    [profile](datasender) add more detail profile in data stream sender (#15176)
    
    * [profile](datasender) add more detail profile in data stream sender
    
    
    Co-authored-by: yiguolei <yi...@gmail.com>
---
 be/src/vec/sink/vdata_stream_sender.cpp | 27 ++++++++++++++++++++++++++-
 be/src/vec/sink/vdata_stream_sender.h   |  8 ++++++--
 2 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 69f2d4d99b..30ce7be6ac 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -106,6 +106,7 @@ Status Channel::send_current_block(bool eos) {
 }
 
 Status Channel::send_local_block(bool eos) {
+    SCOPED_TIMER(_parent->_local_send_timer);
     std::shared_ptr<VDataStreamRecvr> recvr =
             _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
                                                                     _dest_node_id);
@@ -114,6 +115,7 @@ Status Channel::send_local_block(bool eos) {
     if (recvr != nullptr) {
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
         COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
+        COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         recvr->add_block(&block, _parent->_sender_id, true);
         if (eos) {
             recvr->remove_sender(_parent->_sender_id, _be_number);
@@ -123,12 +125,14 @@ Status Channel::send_local_block(bool eos) {
 }
 
 Status Channel::send_local_block(Block* block) {
+    SCOPED_TIMER(_parent->_local_send_timer);
     std::shared_ptr<VDataStreamRecvr> recvr =
             _parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
                                                                     _dest_node_id);
     if (recvr != nullptr) {
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes());
         COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
+        COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         recvr->add_block(block, _parent->_sender_id, false);
     }
     return Status::OK();
@@ -136,6 +140,7 @@ Status Channel::send_local_block(Block* block) {
 
 Status Channel::send_block(PBlock* block, bool eos) {
     SCOPED_TIMER(_parent->_brpc_send_timer);
+    COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
     if (_closure == nullptr) {
         _closure = new RefCountClosure<PTransmitDataResult>();
         _closure->ref();
@@ -201,6 +206,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
 
         {
             SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
+            SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
             _mutable_block->add_rows(block, begin, begin + row_add);
         }
 
@@ -208,7 +214,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
         begin += row_add;
 
         if (row_add == max_add) {
-            RETURN_IF_ERROR(send_current_block());
+            RETURN_IF_ERROR(send_current_block(false));
         }
     }
 
@@ -271,6 +277,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
           _profile(nullptr),
           _serialize_batch_timer(nullptr),
           _bytes_sent_counter(nullptr),
+          _local_send_timer(nullptr),
+          _split_block_hash_compute_timer(nullptr),
+          _split_block_distribute_by_channel_timer(nullptr),
+          _blocks_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(sink.dest_node_id),
           _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
@@ -327,6 +337,10 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
           _brpc_send_timer(nullptr),
           _brpc_wait_timer(nullptr),
           _bytes_sent_counter(nullptr),
+          _local_send_timer(nullptr),
+          _split_block_hash_compute_timer(nullptr),
+          _split_block_distribute_by_channel_timer(nullptr),
+          _blocks_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(0) {
     _cur_pb_block = &_pb_block1;
@@ -347,6 +361,10 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_
           _brpc_send_timer(nullptr),
           _brpc_wait_timer(nullptr),
           _bytes_sent_counter(nullptr),
+          _local_send_timer(nullptr),
+          _split_block_hash_compute_timer(nullptr),
+          _split_block_distribute_by_channel_timer(nullptr),
+          _blocks_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(0) {
     _cur_pb_block = &_pb_block1;
@@ -432,6 +450,11 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _compress_timer = ADD_TIMER(profile(), "CompressTime");
     _brpc_send_timer = ADD_TIMER(profile(), "BrpcSendTime");
     _brpc_wait_timer = ADD_TIMER(profile(), "BrpcSendTime.Wait");
+    _local_send_timer = ADD_TIMER(profile(), "LocalSendTime");
+    _split_block_hash_compute_timer = ADD_TIMER(profile(), "SplitBlockHashComputeTime");
+    _split_block_distribute_by_channel_timer =
+            ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime");
+    _blocks_sent_counter = ADD_COUNTER(profile(), "BlocksSent", TUnit::UNIT);
     _overall_throughput = profile()->add_derived_counter(
             "OverallThroughput", TUnit::BYTES_PER_SECOND,
             std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
@@ -530,6 +553,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
         // TODO: after we support new shuffle hash method, should simple the code
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             if (!_new_shuffle_hash_method) {
+                SCOPED_TIMER(_split_block_hash_compute_timer);
                 // for each row, we have a siphash val
                 std::vector<SipHash> siphashs(rows);
                 // result[j] means column index, i means rows index
@@ -540,6 +564,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
                     hashes[i] = siphashs[i].get64() % element_size;
                 }
             } else {
+                SCOPED_TIMER(_split_block_hash_compute_timer);
                 // result[j] means column index, i means rows index, here to calculate the xxhash value
                 for (int j = 0; j < result_size; ++j) {
                     block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 7b0239764a..7cc13162ef 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -154,6 +154,10 @@ protected:
     RuntimeProfile::Counter* _uncompressed_bytes_counter;
     RuntimeProfile::Counter* _ignore_rows;
     RuntimeProfile::Counter* _local_sent_rows;
+    RuntimeProfile::Counter* _local_send_timer;
+    RuntimeProfile::Counter* _split_block_hash_compute_timer;
+    RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
+    RuntimeProfile::Counter* _blocks_sent_counter;
 
     std::unique_ptr<MemTracker> _mem_tracker;
 
@@ -230,7 +234,7 @@ public:
 
     Status add_rows(Block* block, const std::vector<int>& row);
 
-    virtual Status send_current_block(bool eos = false);
+    virtual Status send_current_block(bool eos);
 
     Status send_local_block(bool eos = false);
 
@@ -388,7 +392,7 @@ public:
     }
 
     // send _mutable_block
-    Status send_current_block(bool eos = false) override {
+    Status send_current_block(bool eos) override {
         if (_enable_local_exchange && is_local()) {
             return send_local_block(eos);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org