You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/13 01:06:58 UTC

[doris] branch master updated: [Refactor](Profile) Add and refactor the join profile (#20693)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 51bbf17786 [Refactor](Profile) Add and refactor the join profile (#20693)
51bbf17786 is described below

commit 51bbf177864f25725043dd7d33d815763cb7885c
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue Jun 13 09:06:51 2023 +0800

    [Refactor](Profile) Add and refactor the join profile (#20693)
---
 be/src/exec/exec_node.cpp                          |  1 -
 .../vec/exec/join/process_hash_table_probe_impl.h  |  2 ++
 be/src/vec/exec/join/vhash_join_node.cpp           | 12 ++-------
 be/src/vec/exec/join/vhash_join_node.h             |  3 +--
 be/src/vec/exec/join/vjoin_node_base.cpp           | 21 ++++++++++++++++
 be/src/vec/exec/join/vjoin_node_base.h             |  8 +++++-
 be/src/vec/exec/join/vnested_loop_join_node.cpp    | 29 +++++++++++-----------
 7 files changed, 48 insertions(+), 28 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 9d731193c0..2707e47a76 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -145,7 +145,6 @@ Status ExecNode::prepare(RuntimeState* state) {
     for (int i = 0; i < _children.size(); ++i) {
         RETURN_IF_ERROR(_children[i]->prepare(state));
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 5ef335ea52..adaefbc9f8 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -502,6 +502,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
         }
         int multi_matched_output_row_count = 0;
         if (current_offset < _batch_size) {
+            SCOPED_TIMER(_search_hashtable_timer);
             while (probe_index < probe_rows) {
                 // ignore null rows
                 if constexpr (ignore_null && need_null_map_for_probe) {
@@ -677,6 +678,7 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
         // dispose the other join conjunct exec
         auto row_count = output_block->rows();
         if (row_count) {
+            SCOPED_TIMER(_join_node->_process_other_join_conjunct_timer);
             int orig_columns = output_block->columns();
             IColumn::Filter other_conjunct_filter(row_count, 1);
             bool can_be_filter_all;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 2dfb93a98d..822b99db58 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -449,11 +449,6 @@ Status HashJoinNode::prepare(RuntimeState* state) {
             "ProbeKeyArena", TUnit::BYTES, "MemoryUsage");
 
     // Build phase
-    _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);
-    runtime_profile()->add_child(_build_phase_profile, false, nullptr);
-    _build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime");
-    _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
-
     auto record_profile = _should_build_hash_table ? _build_phase_profile : faker_runtime_profile();
     _build_table_timer = ADD_TIMER(record_profile, "BuildTableTime");
     _build_side_merge_block_timer = ADD_TIMER(record_profile, "BuildSideMergeBlockTime");
@@ -461,25 +456,22 @@ Status HashJoinNode::prepare(RuntimeState* state) {
     _build_expr_call_timer = ADD_TIMER(record_profile, "BuildExprCallTime");
     _build_table_expanse_timer = ADD_TIMER(record_profile, "BuildTableExpanseTime");
     _build_table_convert_timer = ADD_TIMER(record_profile, "BuildTableConvertToPartitionedTime");
-    _build_rows_counter = ADD_COUNTER(record_profile, "BuildRows", TUnit::UNIT);
     _build_side_compute_hash_timer = ADD_TIMER(record_profile, "BuildSideHashComputingTime");
     _build_runtime_filter_timer = ADD_TIMER(record_profile, "BuildRuntimeFilterTime");
     _push_down_timer = ADD_TIMER(record_profile, "PublishRuntimeFilterTime");
     _push_compute_timer = ADD_TIMER(record_profile, "PushDownComputeTime");
 
     // Probe phase
-    auto probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true);
-    _probe_timer = ADD_TIMER(probe_phase_profile, "ProbeTime");
+    auto probe_phase_profile = _probe_phase_profile;
     _probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime");
     _probe_expr_call_timer = ADD_TIMER(probe_phase_profile, "ProbeExprCallTime");
-    _probe_rows_counter = ADD_COUNTER(probe_phase_profile, "ProbeRows", TUnit::UNIT);
     _search_hashtable_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenSearchHashTableTime");
     _build_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenBuildSideOutputTime");
     _probe_side_output_timer = ADD_TIMER(probe_phase_profile, "ProbeWhenProbeSideOutputTime");
 
-    _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
     _open_timer = ADD_TIMER(runtime_profile(), "OpenTime");
     _allocate_resource_timer = ADD_TIMER(runtime_profile(), "AllocateResourceTime");
+    _process_other_join_conjunct_timer = ADD_TIMER(runtime_profile(), "OtherJoinConjunctTime");
 
     _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", TUnit::UNIT);
     _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), "FilledBuckets", TUnit::UNIT);
diff --git a/be/src/vec/exec/join/vhash_join_node.h b/be/src/vec/exec/join/vhash_join_node.h
index 286a0783a6..36398ba4ad 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -303,6 +303,7 @@ private:
 
     RuntimeProfile::Counter* _open_timer;
     RuntimeProfile::Counter* _allocate_resource_timer;
+    RuntimeProfile::Counter* _process_other_join_conjunct_timer;
 
     RuntimeProfile::Counter* _memory_usage_counter;
     RuntimeProfile::Counter* _build_blocks_memory_usage;
@@ -310,8 +311,6 @@ private:
     RuntimeProfile::HighWaterMarkCounter* _build_arena_memory_usage;
     RuntimeProfile::HighWaterMarkCounter* _probe_arena_memory_usage;
 
-    RuntimeProfile* _build_phase_profile;
-
     std::shared_ptr<Arena> _arena;
 
     // maybe share hash table with other fragment instances
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp b/be/src/vec/exec/join/vjoin_node_base.cpp
index 3bc1d93c8b..cc50299db5 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -105,6 +105,26 @@ VJoinNodeBase::VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const Des
     }
 }
 
+Status VJoinNodeBase::prepare(RuntimeState* state) {
+    RETURN_IF_ERROR(ExecNode::prepare(state));
+    _build_phase_profile = runtime_profile()->create_child("BuildPhase", true, true);
+    runtime_profile()->add_child(_build_phase_profile, false, nullptr);
+    _build_get_next_timer = ADD_TIMER(_build_phase_profile, "BuildGetNextTime");
+    _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
+    _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows", TUnit::UNIT);
+
+    _probe_phase_profile = runtime_profile()->create_child("ProbePhase", true, true);
+    _probe_timer = ADD_TIMER(_probe_phase_profile, "ProbeTime");
+    _probe_rows_counter = ADD_COUNTER(_probe_phase_profile, "ProbeRows", TUnit::UNIT);
+
+    _build_output_block_timer = ADD_TIMER(runtime_profile(), "BuildOutPutBlockTimer");
+    _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
+    _push_down_timer = ADD_TIMER(runtime_profile(), "PublishRuntimeFilterTime");
+    _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
+
+    return Status::OK();
+}
+
 Status VJoinNodeBase::close(RuntimeState* state) {
     return ExecNode::close(state);
 }
@@ -131,6 +151,7 @@ void VJoinNodeBase::_construct_mutable_join_block() {
 }
 
 Status VJoinNodeBase::_build_output_block(Block* origin_block, Block* output_block) {
+    SCOPED_TIMER(_build_output_block_timer);
     auto is_mem_reuse = output_block->mem_reuse();
     MutableBlock mutable_block =
             is_mem_reuse
diff --git a/be/src/vec/exec/join/vjoin_node_base.h b/be/src/vec/exec/join/vjoin_node_base.h
index ce644c159e..f29897719d 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -57,6 +57,8 @@ class VJoinNodeBase : public ExecNode {
 public:
     VJoinNodeBase(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
 
+    virtual Status prepare(RuntimeState* state) override;
+
     virtual Status close(RuntimeState* state) override;
 
     virtual Status open(RuntimeState* state) override;
@@ -130,14 +132,18 @@ protected:
     MutableColumnPtr _tuple_is_null_left_flag_column;
     MutableColumnPtr _tuple_is_null_right_flag_column;
 
+    RuntimeProfile* _build_phase_profile;
     RuntimeProfile::Counter* _build_timer;
     RuntimeProfile::Counter* _build_get_next_timer;
-    RuntimeProfile::Counter* _probe_timer;
     RuntimeProfile::Counter* _build_rows_counter;
+
+    RuntimeProfile* _probe_phase_profile;
+    RuntimeProfile::Counter* _probe_timer;
     RuntimeProfile::Counter* _probe_rows_counter;
     RuntimeProfile::Counter* _push_down_timer;
     RuntimeProfile::Counter* _push_compute_timer;
     RuntimeProfile::Counter* _join_filter_timer;
+    RuntimeProfile::Counter* _build_output_block_timer;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 17cd2ce22a..57ad81f120 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -135,13 +135,8 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
 
-    _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
-    _build_rows_counter = ADD_COUNTER(runtime_profile(), "BuildRows", TUnit::UNIT);
     _probe_rows_counter = ADD_COUNTER(runtime_profile(), "ProbeRows", TUnit::UNIT);
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
-    _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
-    _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
-    _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
 
     // pre-compute the tuple index of build tuples in the output row
     int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
@@ -174,21 +169,27 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) {
     return VJoinNodeBase::close(state);
 }
 
+// TODO: This method should be implemented by the parent class
 Status VNestedLoopJoinNode::_materialize_build_side(RuntimeState* state) {
     // Do a full scan of child(1) and store all build row batches.
-    RETURN_IF_ERROR(child(1)->open(state));
+    {
+        SCOPED_TIMER(_build_get_next_timer);
+        RETURN_IF_ERROR(child(1)->open(state));
+    }
 
     bool eos = false;
+    Block block;
     while (true) {
         RETURN_IF_CANCELLED(state);
-
-        Block block;
-        RETURN_IF_ERROR(child(1)->get_next_after_projects(
-                state, &block, &eos,
-                std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
-                                  ExecNode::get_next,
-                          _children[1], std::placeholders::_1, std::placeholders::_2,
-                          std::placeholders::_3)));
+        {
+            SCOPED_TIMER(_build_get_next_timer);
+            RETURN_IF_ERROR(child(1)->get_next_after_projects(
+                    state, &block, &eos,
+                    std::bind((Status(ExecNode::*)(RuntimeState*, vectorized::Block*, bool*)) &
+                                      ExecNode::get_next,
+                              _children[1], std::placeholders::_1, std::placeholders::_2,
+                              std::placeholders::_3)));
+        }
 
         sink(state, &block, eos);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org