You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/11/02 09:07:14 UTC

(doris) branch master updated: [profile](refactor) Fix invalid shuffle profile (#26298)

This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6828250207a [profile](refactor) Fix invalid shuffle profile (#26298)
6828250207a is described below

commit 6828250207a37985437657c645d3690efd128e00
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu Nov 2 17:07:05 2023 +0800

    [profile](refactor) Fix invalid shuffle profile (#26298)
---
 be/src/pipeline/exec/exchange_sink_operator.cpp    |  8 +++++---
 be/src/pipeline/exec/result_file_sink_operator.cpp |  5 +++++
 be/src/pipeline/exec/result_file_sink_operator.h   | 10 +++++++++
 be/src/vec/sink/vdata_stream_sender.cpp            | 24 ++++++++--------------
 4 files changed, 29 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e8d52b8eaac..9e05d682864 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -392,9 +392,11 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
     } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
                _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         auto rows = block->rows();
-        SCOPED_TIMER(local_state._split_block_hash_compute_timer);
-        RETURN_IF_ERROR(
-                local_state._partitioner->do_partitioning(state, block, _mem_tracker.get()));
+        {
+            SCOPED_TIMER(local_state._split_block_hash_compute_timer);
+            RETURN_IF_ERROR(
+                    local_state._partitioner->do_partitioning(state, block, _mem_tracker.get()));
+        }
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             RETURN_IF_ERROR(channel_add_rows(state, local_state.channels,
                                              local_state._partition_count,
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp b/be/src/pipeline/exec/result_file_sink_operator.cpp
index e572bfe93e5..217696c6939 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -106,6 +106,11 @@ Status ResultFileSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& i
     _sender_id = info.sender_id;
 
     _brpc_wait_timer = ADD_TIMER(_profile, "BrpcSendTime.Wait");
+    _local_send_timer = ADD_TIMER(_profile, "LocalSendTime");
+    _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
+    _split_block_distribute_by_channel_timer =
+            ADD_TIMER(_profile, "SplitBlockDistributeByChannelTime");
+    _brpc_send_timer = ADD_TIMER(_profile, "BrpcSendTime");
     auto& p = _parent->cast<ResultFileSinkOperatorX>();
     CHECK(p._file_opts.get() != nullptr);
     if (p._is_top_sink) {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h b/be/src/pipeline/exec/result_file_sink_operator.h
index 3a98401de60..63217aad047 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -58,6 +58,12 @@ public:
     [[nodiscard]] int sender_id() const { return _sender_id; }
 
     RuntimeProfile::Counter* brpc_wait_timer() { return _brpc_wait_timer; }
+    RuntimeProfile::Counter* local_send_timer() { return _local_send_timer; }
+    RuntimeProfile::Counter* brpc_send_timer() { return _brpc_send_timer; }
+    RuntimeProfile::Counter* merge_block_timer() { return _merge_block_timer; }
+    RuntimeProfile::Counter* split_block_distribute_by_channel_timer() {
+        return _split_block_distribute_by_channel_timer;
+    }
 
 private:
     friend class ResultFileSinkOperatorX;
@@ -73,6 +79,10 @@ private:
     vectorized::BlockSerializer<ResultFileSinkLocalState> _serializer;
     std::unique_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
     RuntimeProfile::Counter* _brpc_wait_timer;
+    RuntimeProfile::Counter* _local_send_timer;
+    RuntimeProfile::Counter* _brpc_send_timer;
+    RuntimeProfile::Counter* _merge_block_timer;
+    RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
 
     int _sender_id;
 };
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index ca565f82d41..a7066f981ba 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -126,9 +126,7 @@ Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
 
 template <typename Parent>
 Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
-        SCOPED_TIMER(_parent->local_send_timer());
-    }
+    SCOPED_TIMER(_parent->local_send_timer());
     Block block = _serializer.get_block()->to_block();
     _serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
     if (_recvr_is_valid()) {
@@ -157,9 +155,7 @@ Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
 
 template <typename Parent>
 Status Channel<Parent>::send_local_block(Block* block) {
-    if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
-        SCOPED_TIMER(_parent->local_send_timer());
-    }
+    SCOPED_TIMER(_parent->local_send_timer());
     if (_recvr_is_valid()) {
         if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
             COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
@@ -176,9 +172,9 @@ Status Channel<Parent>::send_local_block(Block* block) {
 template <typename Parent>
 Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status exec_status) {
     if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
-        SCOPED_TIMER(_parent->brpc_send_timer());
         COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
     }
+    SCOPED_TIMER(_parent->brpc_send_timer());
 
     if (_closure == nullptr) {
         _closure = new RefCountClosure<PTransmitDataResult>();
@@ -631,8 +627,10 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
     } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
                _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
         auto rows = block->rows();
-        SCOPED_TIMER(_split_block_hash_compute_timer);
-        RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
+        {
+            SCOPED_TIMER(_split_block_hash_compute_timer);
+            RETURN_IF_ERROR(_partitioner->do_partitioning(state, block, _mem_tracker.get()));
+        }
         if (_part_type == TPartitionType::HASH_PARTITIONED) {
             RETURN_IF_ERROR(channel_add_rows(state, _channels, _partition_count,
                                              (uint64_t*)_partitioner->get_channel_ids(), rows,
@@ -729,16 +727,12 @@ Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
         SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
         if (rows) {
             if (rows->size() > 0) {
-                if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
-                    SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
-                }
+                SCOPED_TIMER(_parent->split_block_distribute_by_channel_timer());
                 const int* begin = &(*rows)[0];
                 _mutable_block->add_rows(block, begin, begin + rows->size());
             }
         } else if (!block->empty()) {
-            if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>) {
-                SCOPED_TIMER(_parent->merge_block_timer());
-            }
+            SCOPED_TIMER(_parent->merge_block_timer());
             RETURN_IF_ERROR(_mutable_block->merge(*block));
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org