You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/06/12 03:34:56 UTC

[doris] branch master updated: [improvement](profile)add sum/avg rpc time (#20511)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 14f59bef1d [improvement](profile)add sum/avg rpc time  (#20511)
14f59bef1d is described below

commit 14f59bef1d93fcfa947bfc9720dabdbeb4c04118
Author: Mryange <59...@users.noreply.github.com>
AuthorDate: Mon Jun 12 11:34:49 2023 +0800

    [improvement](profile)add sum/avg rpc time  (#20511)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp   | 45 ++++++++++++++++++-------
 be/src/pipeline/exec/exchange_sink_buffer.h     |  7 ++--
 be/src/pipeline/exec/exchange_sink_operator.cpp |  9 +----
 be/src/pipeline/exec/exchange_sink_operator.h   |  3 --
 be/src/vec/sink/vdata_stream_sender.h           |  5 ++-
 5 files changed, 43 insertions(+), 26 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2b72ba5729..c0f0e921e9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -28,6 +28,7 @@
 #include <stddef.h>
 
 #include <atomic>
+#include <cstdint>
 #include <exception>
 #include <functional>
 #include <memory>
@@ -305,25 +306,28 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
 
 void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
     int64_t local_max_time = 0;
-    int64_t local_min_time = 0;
-    auto iter = _instance_to_rpc_time.begin();
-    if (iter != _instance_to_rpc_time.end()) {
-        local_max_time = iter->second;
-        local_min_time = iter->second;
-        iter++;
-    }
-    while (iter != _instance_to_rpc_time.end()) {
-        int64_t cur_val = iter->second;
-        local_max_time = cur_val > local_max_time ? cur_val : local_max_time;
-        local_min_time = cur_val < local_min_time ? cur_val : local_min_time;
-        iter++;
+    int64_t local_min_time = INT64_MAX;
+    for (auto& [id, time] : _instance_to_rpc_time) {
+        if (time != 0) {
+            local_max_time = std::max(local_max_time, time);
+            local_min_time = std::min(local_min_time, time);
+        }
     }
     *max_time = local_max_time;
     *min_time = local_min_time;
 }
 
+int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
+    int64_t sum_time = 0;
+    for (auto& [id, time] : _instance_to_rpc_time) {
+        sum_time += time;
+    }
+    return sum_time;
+}
+
 void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
                                       int64_t receive_rpc_time) {
+    _rpc_count++;
     int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
     DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
     if (rpc_spend_time > 0) {
@@ -331,4 +335,21 @@ void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
     }
 }
 
+void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
+    auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
+    auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
+    auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
+    auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
+    auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
+
+    int64_t max_rpc_time = 0, min_rpc_time = 0;
+    get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
+    _max_rpc_timer->set(max_rpc_time);
+    _min_rpc_timer->set(min_rpc_time);
+
+    _count_rpc->set(_rpc_count);
+    int64_t sum_time = get_sum_rpc_time();
+    _sum_rpc_timer->set(sum_time);
+    _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));
+}
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h
index 0518a4e0cd..ccc5e2afff 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -32,6 +32,7 @@
 
 #include "common/global_types.h"
 #include "common/status.h"
+#include "runtime/runtime_state.h"
 #include "service/backend_options.h"
 
 namespace doris {
@@ -163,8 +164,8 @@ public:
     bool can_write() const;
     bool is_pending_finish() const;
     void close();
-    void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
     void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
+    void update_profile(RuntimeProfile* profile);
 
 private:
     phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
@@ -191,7 +192,7 @@ private:
     // Sender instance id, unique within a fragment. StreamSender save the variable
     int _sender_id;
     int _be_number;
-
+    std::atomic<int64_t> _rpc_count = 0;
     PipelineFragmentContext* _context;
 
     Status _send_rpc(InstanceLoId);
@@ -201,6 +202,8 @@ private:
     inline void _failed(InstanceLoId id, const std::string& err);
     inline void _set_receiver_eof(InstanceLoId id);
     inline bool _is_receiver_eof(InstanceLoId id);
+    void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
+    int64_t get_sum_rpc_time();
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 6862d45a42..dd0a35ce95 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -24,7 +24,6 @@
 #include "common/status.h"
 #include "exchange_sink_buffer.h"
 #include "pipeline/exec/operator.h"
-#include "runtime/runtime_state.h"
 #include "vec/sink/vdata_stream_sender.h"
 
 namespace doris {
@@ -70,8 +69,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
 
     RETURN_IF_ERROR(DataSinkOperator::prepare(state));
     _sink->registe_channels(_sink_buffer.get());
-    _max_rpc_timer = ADD_TIMER(_sink->profile(), "MaxRpcTime");
-    _min_rpc_timer = ADD_TIMER(_sink->profile(), "MinRpcTime");
     return Status::OK();
 }
 
@@ -84,12 +81,8 @@ bool ExchangeSinkOperator::is_pending_finish() const {
 }
 
 Status ExchangeSinkOperator::close(RuntimeState* state) {
-    int64_t max_rpc_time = 0;
-    int64_t min_rpc_time = 0;
-    _sink_buffer->get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
-    _max_rpc_timer->set(max_rpc_time);
-    _min_rpc_timer->set(min_rpc_time);
     RETURN_IF_ERROR(DataSinkOperator::close(state));
+    _sink_buffer->update_profile(_sink->profile());
     _sink_buffer->close();
     return Status::OK();
 }
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h
index 04ccd81955..0ebb8b3e9e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -68,9 +68,6 @@ private:
     RuntimeState* _state = nullptr;
     PipelineFragmentContext* _context;
     int _mult_cast_id = -1;
-
-    RuntimeProfile::Counter* _max_rpc_timer = nullptr;
-    RuntimeProfile::Counter* _min_rpc_timer = nullptr;
 };
 
 } // namespace pipeline
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 7c1b423083..cb4d8fab3d 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -108,6 +108,7 @@ public:
 
 protected:
     friend class Channel;
+    friend class PipChannel;
     friend class pipeline::ExchangeSinkBuffer;
 
     void _roll_pb_block();
@@ -437,6 +438,7 @@ public:
     // rpc (or OK if there wasn't one that hasn't been reported yet).
     // if batch is nullptr, send the eof packet
     Status send_block(PBlock* block, bool eos = false) override {
+        COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         std::unique_ptr<PBlock> pblock_ptr;
         pblock_ptr.reset(block);
 
@@ -454,6 +456,7 @@ public:
     }
 
     Status send_block(BroadcastPBlockHolder* block, bool eos = false) override {
+        COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
         if (eos) {
             if (_eos_send) {
                 return Status::OK();
@@ -472,7 +475,7 @@ public:
         if (is_local()) {
             return send_local_block(eos);
         }
-
+        SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         auto block_ptr = std::make_unique<PBlock>();
         if (_mutable_block) {
             auto block = _mutable_block->to_block();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org