You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2023/06/12 03:34:56 UTC
[doris] branch master updated: [improvement](profile)add sum/avg rpc time (#20511)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 14f59bef1d [improvement](profile)add sum/avg rpc time (#20511)
14f59bef1d is described below
commit 14f59bef1d93fcfa947bfc9720dabdbeb4c04118
Author: Mryange <59...@users.noreply.github.com>
AuthorDate: Mon Jun 12 11:34:49 2023 +0800
[improvement](profile)add sum/avg rpc time (#20511)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 45 ++++++++++++++++++-------
be/src/pipeline/exec/exchange_sink_buffer.h | 7 ++--
be/src/pipeline/exec/exchange_sink_operator.cpp | 9 +----
be/src/pipeline/exec/exchange_sink_operator.h | 3 --
be/src/vec/sink/vdata_stream_sender.h | 5 ++-
5 files changed, 43 insertions(+), 26 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 2b72ba5729..c0f0e921e9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -28,6 +28,7 @@
#include <stddef.h>
#include <atomic>
+#include <cstdint>
#include <exception>
#include <functional>
#include <memory>
@@ -305,25 +306,28 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) {
int64_t local_max_time = 0;
- int64_t local_min_time = 0;
- auto iter = _instance_to_rpc_time.begin();
- if (iter != _instance_to_rpc_time.end()) {
- local_max_time = iter->second;
- local_min_time = iter->second;
- iter++;
- }
- while (iter != _instance_to_rpc_time.end()) {
- int64_t cur_val = iter->second;
- local_max_time = cur_val > local_max_time ? cur_val : local_max_time;
- local_min_time = cur_val < local_min_time ? cur_val : local_min_time;
- iter++;
+ int64_t local_min_time = INT64_MAX;
+ for (auto& [id, time] : _instance_to_rpc_time) {
+ if (time != 0) {
+ local_max_time = std::max(local_max_time, time);
+ local_min_time = std::min(local_min_time, time);
+ }
}
*max_time = local_max_time;
*min_time = local_min_time;
}
+int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
+ int64_t sum_time = 0;
+ for (auto& [id, time] : _instance_to_rpc_time) {
+ sum_time += time;
+ }
+ return sum_time;
+}
+
void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
int64_t receive_rpc_time) {
+ _rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
if (rpc_spend_time > 0) {
@@ -331,4 +335,21 @@ void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
}
}
+void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
+ auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
+ auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
+ auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
+ auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
+ auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
+
+ int64_t max_rpc_time = 0, min_rpc_time = 0;
+ get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
+ _max_rpc_timer->set(max_rpc_time);
+ _min_rpc_timer->set(min_rpc_time);
+
+ _count_rpc->set(_rpc_count);
+ int64_t sum_time = get_sum_rpc_time();
+ _sum_rpc_timer->set(sum_time);
+ _avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1), _rpc_count.load()));
+}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h
index 0518a4e0cd..ccc5e2afff 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -32,6 +32,7 @@
#include "common/global_types.h"
#include "common/status.h"
+#include "runtime/runtime_state.h"
#include "service/backend_options.h"
namespace doris {
@@ -163,8 +164,8 @@ public:
bool can_write() const;
bool is_pending_finish() const;
void close();
- void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t receive_rpc_time);
+ void update_profile(RuntimeProfile* profile);
private:
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
@@ -191,7 +192,7 @@ private:
// Sender instance id, unique within a fragment. StreamSender save the variable
int _sender_id;
int _be_number;
-
+ std::atomic<int64_t> _rpc_count = 0;
PipelineFragmentContext* _context;
Status _send_rpc(InstanceLoId);
@@ -201,6 +202,8 @@ private:
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
+ void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
+ int64_t get_sum_rpc_time();
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 6862d45a42..dd0a35ce95 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -24,7 +24,6 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "pipeline/exec/operator.h"
-#include "runtime/runtime_state.h"
#include "vec/sink/vdata_stream_sender.h"
namespace doris {
@@ -70,8 +69,6 @@ Status ExchangeSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperator::prepare(state));
_sink->registe_channels(_sink_buffer.get());
- _max_rpc_timer = ADD_TIMER(_sink->profile(), "MaxRpcTime");
- _min_rpc_timer = ADD_TIMER(_sink->profile(), "MinRpcTime");
return Status::OK();
}
@@ -84,12 +81,8 @@ bool ExchangeSinkOperator::is_pending_finish() const {
}
Status ExchangeSinkOperator::close(RuntimeState* state) {
- int64_t max_rpc_time = 0;
- int64_t min_rpc_time = 0;
- _sink_buffer->get_max_min_rpc_time(&max_rpc_time, &min_rpc_time);
- _max_rpc_timer->set(max_rpc_time);
- _min_rpc_timer->set(min_rpc_time);
RETURN_IF_ERROR(DataSinkOperator::close(state));
+ _sink_buffer->update_profile(_sink->profile());
_sink_buffer->close();
return Status::OK();
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h
index 04ccd81955..0ebb8b3e9e 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -68,9 +68,6 @@ private:
RuntimeState* _state = nullptr;
PipelineFragmentContext* _context;
int _mult_cast_id = -1;
-
- RuntimeProfile::Counter* _max_rpc_timer = nullptr;
- RuntimeProfile::Counter* _min_rpc_timer = nullptr;
};
} // namespace pipeline
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 7c1b423083..cb4d8fab3d 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -108,6 +108,7 @@ public:
protected:
friend class Channel;
+ friend class PipChannel;
friend class pipeline::ExchangeSinkBuffer;
void _roll_pb_block();
@@ -437,6 +438,7 @@ public:
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
Status send_block(PBlock* block, bool eos = false) override {
+ COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
std::unique_ptr<PBlock> pblock_ptr;
pblock_ptr.reset(block);
@@ -454,6 +456,7 @@ public:
}
Status send_block(BroadcastPBlockHolder* block, bool eos = false) override {
+ COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
if (eos) {
if (_eos_send) {
return Status::OK();
@@ -472,7 +475,7 @@ public:
if (is_local()) {
return send_local_block(eos);
}
-
+ SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
auto block_ptr = std::make_unique<PBlock>();
if (_mutable_block) {
auto block = _mutable_block->to_block();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org