You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 04:10:57 UTC

[doris] 21/29: [Opt](performance) opt the outer join for nested loop join (#20524)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 000f2d663acbd28eae59b3e3ccdd20affd556e59
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Wed Jun 7 17:31:36 2023 +0800

    [Opt](performance) opt the outer join for nested loop join (#20524)
---
 be/src/vec/exec/join/vnested_loop_join_node.cpp | 124 +++++++++++++++---------
 be/src/vec/exec/join/vnested_loop_join_node.h   |  82 +++++++++-------
 2 files changed, 122 insertions(+), 84 deletions(-)

diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index b8160b2ade..17cd2ce22a 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -97,7 +97,6 @@ private:
 VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode,
                                          const DescriptorTbl& descs)
         : VJoinNodeBase(pool, tnode, descs),
-          _cur_probe_row_visited_flags(false),
           _matched_rows_done(false),
           _left_block_pos(0),
           _left_side_eos(false),
@@ -232,6 +231,9 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block*
 
 Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* block, bool eos) {
     COUNTER_UPDATE(_probe_rows_counter, block->rows());
+    _cur_probe_row_visited_flags.resize(block->rows());
+    std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 0);
+    _left_block_pos = 0;
     _need_more_input_data = false;
     _left_side_eos = eos;
 
@@ -287,22 +289,24 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
             DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
             assert_cast<ColumnNullable*>(dst_columns[i].get())
                     ->get_nested_column_ptr()
-                    ->insert_many_from(*src_column.column, _left_block_pos, 1);
+                    ->insert_range_from(*src_column.column, _left_block_start_pos,
+                                        _left_side_process_count);
             assert_cast<ColumnNullable*>(dst_columns[i].get())
                     ->get_null_map_column()
                     .get_data()
                     .resize_fill(origin_sz + 1, 0);
         } else {
-            dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1);
+            dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
+                                              _left_side_process_count);
         }
     }
     for (size_t i = 0; i < _num_build_side_columns; ++i) {
-        dst_columns[_num_probe_side_columns + i]->insert_default();
+        dst_columns[_num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count);
     }
     IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
                                          *dst_columns[dst_columns.size() - 1])
                                          .get_data();
-    mark_data.resize_fill(mark_data.size() + 1, 0);
+    mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
 }
 
 void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
@@ -457,54 +461,58 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
         }
         _output_null_idx_build_side = i;
     } else {
-        if constexpr (IsSemi) {
-            if (!_cur_probe_row_visited_flags && !_is_mark_join) {
-                return;
+        if (!_is_mark_join) {
+            auto new_size = column_size;
+            DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
+            for (int j = _left_block_start_pos;
+                 j < _left_block_start_pos + _left_side_process_count; ++j) {
+                if (_cur_probe_row_visited_flags[j] == IsSemi) {
+                    new_size++;
+                    for (size_t i = 0; i < _num_probe_side_columns; ++i) {
+                        const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
+                        if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
+                            DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
+                            assert_cast<ColumnNullable*>(dst_columns[i].get())
+                                    ->get_nested_column_ptr()
+                                    ->insert_many_from(*src_column.column, j, 1);
+                            assert_cast<ColumnNullable*>(dst_columns[i].get())
+                                    ->get_null_map_column()
+                                    .get_data()
+                                    .resize_fill(new_size, 0);
+                        } else {
+                            dst_columns[i]->insert_many_from(*src_column.column, j, 1);
+                        }
+                    }
+                }
             }
-        } else {
-            if (_cur_probe_row_visited_flags && !_is_mark_join) {
-                return;
+            if (new_size > column_size) {
+                for (size_t i = 0; i < _num_build_side_columns; ++i) {
+                    dst_columns[_num_probe_side_columns + i]->insert_many_defaults(new_size -
+                                                                                   column_size);
+                }
+                _resize_fill_tuple_is_null_column(new_size, 0, 1);
             }
-        }
-
-        auto new_size = column_size + 1;
-        if (_is_mark_join) {
+        } else {
             IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
                                                  *dst_columns[dst_columns.size() - 1])
                                                  .get_data();
-            mark_data.resize_fill(mark_data.size() + 1,
-                                  (IsSemi && !_cur_probe_row_visited_flags) ||
-                                                  (!IsSemi && _cur_probe_row_visited_flags)
-                                          ? 0
-                                          : 1);
-        }
-
-        DCHECK_LT(_left_block_pos, _left_block.rows());
-        for (size_t i = 0; i < _num_probe_side_columns; ++i) {
-            const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
-            if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
-                DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
-                assert_cast<ColumnNullable*>(dst_columns[i].get())
-                        ->get_nested_column_ptr()
-                        ->insert_many_from(*src_column.column, _left_block_pos, 1);
-                assert_cast<ColumnNullable*>(dst_columns[i].get())
-                        ->get_null_map_column()
-                        .get_data()
-                        .resize_fill(new_size, 0);
-            } else {
-                dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1);
+            mark_data.reserve(mark_data.size() + _left_side_process_count);
+            DCHECK_LT(_left_block_pos, _left_block.rows());
+            for (int j = _left_block_start_pos;
+                 j < _left_block_start_pos + _left_side_process_count; ++j) {
+                mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]);
+                for (size_t i = 0; i < _num_probe_side_columns; ++i) {
+                    const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
+                    DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
+                    dst_columns[i]->insert_from(*src_column.column, j);
+                }
             }
         }
-        for (size_t i = 0; i < _num_build_side_columns; ++i) {
-            dst_columns[_num_probe_side_columns + i]->insert_default();
-        }
-        _resize_fill_tuple_is_null_column(new_size, 0, 1);
     }
 }
 
 void VNestedLoopJoinNode::_reset_with_next_probe_row() {
     // TODO: need a vector of left block to register the _probe_row_visited_flags
-    _cur_probe_row_visited_flags = false;
     _current_build_pos = 0;
     _left_block_pos++;
 }
@@ -526,8 +534,8 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
                             ->get_data();
             auto* __restrict build_side_flag_data = build_side_flag.data();
             auto cur_sz = build_side_flag.size();
-            const size_t offset = _offset_stack.top();
-            _offset_stack.pop();
+            const size_t offset = _build_offset_stack.top();
+            _build_offset_stack.pop();
             for (size_t j = 0; j < cur_sz; j++) {
                 build_side_flag_data[j] |= filter[offset + j];
             }
@@ -535,7 +543,20 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
         }
     }
     if constexpr (SetProbeSideFlag) {
-        _cur_probe_row_visited_flags |= simd::contain_byte<uint8>(filter.data(), filter.size(), 1);
+        int end = filter.size();
+        for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos;
+             i >= _left_block_start_pos; i--) {
+            int offset = 0;
+            if (!_probe_offset_stack.empty()) {
+                offset = _probe_offset_stack.top();
+                _probe_offset_stack.pop();
+            }
+            if (!_cur_probe_row_visited_flags[i]) {
+                _cur_probe_row_visited_flags[i] =
+                        simd::contain_byte<uint8>(filter.data() + offset, end - offset, 1) ? 1 : 0;
+            }
+            end = offset;
+        }
     }
     if (materialize) {
         Block::filter_block_internal(block, filter, column_to_keep);
@@ -554,7 +575,7 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
     // 3. Use bool column to do filtering.
     size_t build_block_idx =
             _current_build_pos == 0 ? _build_blocks.size() - 1 : _current_build_pos - 1;
-    size_t processed_blocks_num = _offset_stack.size();
+    size_t processed_blocks_num = _build_offset_stack.size();
     if (LIKELY(!_join_conjuncts.empty() && block->rows() > 0)) {
         IColumn::Filter filter(block->rows(), 1);
         bool can_filter_all = false;
@@ -563,6 +584,11 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
 
         if (can_filter_all) {
             CLEAR_BLOCK
+            std::stack<uint16_t> empty1;
+            _probe_offset_stack.swap(empty1);
+
+            std::stack<uint16_t> empty2;
+            _build_offset_stack.swap(empty2);
         } else {
             _do_filtering_and_update_visited_flags_impl<decltype(filter), SetBuildSideFlag,
                                                         SetProbeSideFlag>(
@@ -577,14 +603,16 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
                                 ->get_data();
                 auto* __restrict build_side_flag_data = build_side_flag.data();
                 auto cur_sz = build_side_flag.size();
-                _offset_stack.pop();
+                _build_offset_stack.pop();
                 memset(reinterpret_cast<void*>(build_side_flag_data), 1, cur_sz);
                 build_block_idx =
                         build_block_idx == 0 ? _build_blocks.size() - 1 : build_block_idx - 1;
             }
         }
         if constexpr (SetProbeSideFlag) {
-            _cur_probe_row_visited_flags = true;
+            std::stack<uint16_t> empty;
+            _probe_offset_stack.swap(empty);
+            std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 1);
         }
         if (!materialize) {
             CLEAR_BLOCK
@@ -676,7 +704,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block,
 }
 
 bool VNestedLoopJoinNode::need_more_input_data() const {
-    return _need_more_input_data and !_left_side_eos;
+    return _need_more_input_data and !_left_side_eos and _join_block.rows() == 0;
 }
 
 void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h
index 4bd66798d9..03676629bc 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -94,31 +94,46 @@ private:
         constexpr bool ignore_null = JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
                                      JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
                                      JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN;
+        _left_block_start_pos = _left_block_pos;
+        _left_side_process_count = 0;
+        DCHECK(!_need_more_input_data || !_matched_rows_done);
 
         MutableBlock mutable_join_block(&_join_block);
+        if (!_matched_rows_done && !_need_more_input_data) {
+            // We should try to join rows if there still are some rows from probe side.
+            while (_join_block.rows() < state->batch_size()) {
+                while (_current_build_pos == _build_blocks.size() ||
+                       _left_block_pos == _left_block.rows()) {
+                    // if left block is empty(), do not need disprocess the left block rows
+                    if (_left_block.rows() > _left_block_pos) {
+                        _left_side_process_count++;
+                    }
 
-        while (_join_block.rows() < state->batch_size() && !_matched_rows_done) {
-            // If this left block is exhausted or empty, we need to pull data from left child.
-            if (_left_block_pos == _left_block.rows()) {
-                if (_left_side_eos) {
-                    _matched_rows_done = true;
-                } else {
-                    _left_block_pos = 0;
-                    _need_more_input_data = true;
-                    return Status::OK();
+                    _reset_with_next_probe_row();
+                    if (_left_block_pos < _left_block.rows()) {
+                        if constexpr (set_probe_side_flag) {
+                            _probe_offset_stack.push(mutable_join_block.rows());
+                        }
+                    } else {
+                        if (_left_side_eos) {
+                            _matched_rows_done = true;
+                        } else {
+                            _need_more_input_data = true;
+                        }
+                        break;
+                    }
                 }
-            }
 
-            // We should try to join rows if there still are some rows from probe side.
-            if (!_matched_rows_done && _current_build_pos < _build_blocks.size()) {
-                do {
-                    const auto& now_process_build_block = _build_blocks[_current_build_pos++];
-                    if constexpr (set_build_side_flag) {
-                        _offset_stack.push(mutable_join_block.rows());
-                    }
-                    _process_left_child_block(mutable_join_block, now_process_build_block);
-                } while (_join_block.rows() < state->batch_size() &&
-                         _current_build_pos < _build_blocks.size());
+                // Do not have left row need to be disposed
+                if (_matched_rows_done || _need_more_input_data) {
+                    break;
+                }
+
+                const auto& now_process_build_block = _build_blocks[_current_build_pos++];
+                if constexpr (set_build_side_flag) {
+                    _build_offset_stack.push(mutable_join_block.rows());
+                }
+                _process_left_child_block(mutable_join_block, now_process_build_block);
             }
 
             if constexpr (set_probe_side_flag) {
@@ -133,28 +148,20 @@ private:
                 }
                 mutable_join_block = MutableBlock(&_join_block);
                 // If this join operation is left outer join or full outer join, when
-                // `_current_build_pos == _build_blocks.size()`, means all rows from build
-                // side have been joined with the current probe row, we should output current
+                // `_left_side_process_count`, means all rows from build
+                // side have been joined with _left_side_process_count, we should output current
                 // probe row with null from build side.
-                if (_current_build_pos == _build_blocks.size()) {
-                    if (!_matched_rows_done) {
-                        _finalize_current_phase<false,
-                                                JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>(
-                                mutable_join_block, state->batch_size());
-                        _reset_with_next_probe_row();
-                    }
-                    break;
+                if (_left_side_process_count) {
+                    _finalize_current_phase<false, JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>(
+                            mutable_join_block, state->batch_size());
                 }
             }
 
-            if (!_matched_rows_done && _current_build_pos == _build_blocks.size()) {
+            if (_left_side_process_count) {
                 if (_is_mark_join && _build_blocks.empty()) {
                     DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN);
                     _append_left_data_with_null(mutable_join_block);
-                    _reset_with_next_probe_row();
-                    break;
                 }
-                _reset_with_next_probe_row();
             }
         }
 
@@ -221,7 +228,7 @@ private:
     // Visited flags for each row in build side.
     MutableColumns _build_side_visited_flags;
     // Visited flags for current row in probe side.
-    bool _cur_probe_row_visited_flags;
+    std::vector<int8_t> _cur_probe_row_visited_flags;
     size_t _current_build_pos = 0;
 
     size_t _num_probe_side_columns = 0;
@@ -238,8 +245,10 @@ private:
     // is responsible for.
     Block _left_block;
 
+    int _left_block_start_pos = 0;
     int _left_block_pos; // current scan pos in _left_block
     bool _left_side_eos; // if true, left child has no more rows to process
+    int _left_side_process_count = 0;
 
     bool _old_version_flag;
 
@@ -249,7 +258,8 @@ private:
     VExprContextSPtrs _filter_src_expr_ctxs;
     bool _is_output_left_side_only = false;
     bool _need_more_input_data = true;
-    std::stack<uint16_t> _offset_stack;
+    std::stack<uint16_t> _build_offset_stack;
+    std::stack<uint16_t> _probe_offset_stack;
     VExprContextSPtrs _join_conjuncts;
 
     friend struct RuntimeFilterBuild;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org