You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/12/20 04:07:46 UTC
[doris] branch master updated: [profile](datasender) add more detail profile in data stream sender (#15176)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a2d56af7d9 [profile](datasender) add more detail profile in data stream sender (#15176)
a2d56af7d9 is described below
commit a2d56af7d9cbaec75178ce2c9bf50c27f546749f
Author: yiguolei <67...@qq.com>
AuthorDate: Tue Dec 20 12:07:34 2022 +0800
[profile](datasender) add more detail profile in data stream sender (#15176)
* [profile](datasender) add more detail profile in data stream sender
Co-authored-by: yiguolei <yi...@gmail.com>
---
be/src/vec/sink/vdata_stream_sender.cpp | 27 ++++++++++++++++++++++++++-
be/src/vec/sink/vdata_stream_sender.h | 8 ++++++--
2 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 69f2d4d99b..30ce7be6ac 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -106,6 +106,7 @@ Status Channel::send_current_block(bool eos) {
}
Status Channel::send_local_block(bool eos) {
+ SCOPED_TIMER(_parent->_local_send_timer);
std::shared_ptr<VDataStreamRecvr> recvr =
_parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
_dest_node_id);
@@ -114,6 +115,7 @@ Status Channel::send_local_block(bool eos) {
if (recvr != nullptr) {
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
+ COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
recvr->add_block(&block, _parent->_sender_id, true);
if (eos) {
recvr->remove_sender(_parent->_sender_id, _be_number);
@@ -123,12 +125,14 @@ Status Channel::send_local_block(bool eos) {
}
Status Channel::send_local_block(Block* block) {
+ SCOPED_TIMER(_parent->_local_send_timer);
std::shared_ptr<VDataStreamRecvr> recvr =
_parent->state()->exec_env()->vstream_mgr()->find_recvr(_fragment_instance_id,
_dest_node_id);
if (recvr != nullptr) {
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block->bytes());
COUNTER_UPDATE(_parent->_local_sent_rows, block->rows());
+ COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
recvr->add_block(block, _parent->_sender_id, false);
}
return Status::OK();
@@ -136,6 +140,7 @@ Status Channel::send_local_block(Block* block) {
Status Channel::send_block(PBlock* block, bool eos) {
SCOPED_TIMER(_parent->_brpc_send_timer);
+ COUNTER_UPDATE(_parent->_blocks_sent_counter, 1);
if (_closure == nullptr) {
_closure = new RefCountClosure<PTransmitDataResult>();
_closure->ref();
@@ -201,6 +206,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
{
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
+ SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
_mutable_block->add_rows(block, begin, begin + row_add);
}
@@ -208,7 +214,7 @@ Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
begin += row_add;
if (row_add == max_add) {
- RETURN_IF_ERROR(send_current_block());
+ RETURN_IF_ERROR(send_current_block(false));
}
}
@@ -271,6 +277,10 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, int
_profile(nullptr),
_serialize_batch_timer(nullptr),
_bytes_sent_counter(nullptr),
+ _local_send_timer(nullptr),
+ _split_block_hash_compute_timer(nullptr),
+ _split_block_distribute_by_channel_timer(nullptr),
+ _blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(sink.dest_node_id),
_transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
@@ -327,6 +337,10 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const RowD
_brpc_send_timer(nullptr),
_brpc_wait_timer(nullptr),
_bytes_sent_counter(nullptr),
+ _local_send_timer(nullptr),
+ _split_block_hash_compute_timer(nullptr),
+ _split_block_distribute_by_channel_timer(nullptr),
+ _blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
@@ -347,6 +361,10 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_
_brpc_send_timer(nullptr),
_brpc_wait_timer(nullptr),
_bytes_sent_counter(nullptr),
+ _local_send_timer(nullptr),
+ _split_block_hash_compute_timer(nullptr),
+ _split_block_distribute_by_channel_timer(nullptr),
+ _blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
_cur_pb_block = &_pb_block1;
@@ -432,6 +450,11 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_compress_timer = ADD_TIMER(profile(), "CompressTime");
_brpc_send_timer = ADD_TIMER(profile(), "BrpcSendTime");
_brpc_wait_timer = ADD_TIMER(profile(), "BrpcSendTime.Wait");
+ _local_send_timer = ADD_TIMER(profile(), "LocalSendTime");
+ _split_block_hash_compute_timer = ADD_TIMER(profile(), "SplitBlockHashComputeTime");
+ _split_block_distribute_by_channel_timer =
+ ADD_TIMER(profile(), "SplitBlockDistributeByChannelTime");
+ _blocks_sent_counter = ADD_COUNTER(profile(), "BlocksSent", TUnit::UNIT);
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second, _bytes_sent_counter,
@@ -530,6 +553,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
// TODO: after we support new shuffle hash method, should simple the code
if (_part_type == TPartitionType::HASH_PARTITIONED) {
if (!_new_shuffle_hash_method) {
+ SCOPED_TIMER(_split_block_hash_compute_timer);
// for each row, we have a siphash val
std::vector<SipHash> siphashs(rows);
// result[j] means column index, i means rows index
@@ -540,6 +564,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {
hashes[i] = siphashs[i].get64() % element_size;
}
} else {
+ SCOPED_TIMER(_split_block_hash_compute_timer);
// result[j] means column index, i means rows index, here to calculate the xxhash value
for (int j = 0; j < result_size; ++j) {
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
diff --git a/be/src/vec/sink/vdata_stream_sender.h b/be/src/vec/sink/vdata_stream_sender.h
index 7b0239764a..7cc13162ef 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -154,6 +154,10 @@ protected:
RuntimeProfile::Counter* _uncompressed_bytes_counter;
RuntimeProfile::Counter* _ignore_rows;
RuntimeProfile::Counter* _local_sent_rows;
+ RuntimeProfile::Counter* _local_send_timer;
+ RuntimeProfile::Counter* _split_block_hash_compute_timer;
+ RuntimeProfile::Counter* _split_block_distribute_by_channel_timer;
+ RuntimeProfile::Counter* _blocks_sent_counter;
std::unique_ptr<MemTracker> _mem_tracker;
@@ -230,7 +234,7 @@ public:
Status add_rows(Block* block, const std::vector<int>& row);
- virtual Status send_current_block(bool eos = false);
+ virtual Status send_current_block(bool eos);
Status send_local_block(bool eos = false);
@@ -388,7 +392,7 @@ public:
}
// send _mutable_block
- Status send_current_block(bool eos = false) override {
+ Status send_current_block(bool eos) override {
if (_enable_local_exchange && is_local()) {
return send_local_block(eos);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org