You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 04:10:57 UTC
[doris] 21/29: [Opt](performance) opt the outer join for nested loop join (#20524)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 000f2d663acbd28eae59b3e3ccdd20affd556e59
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Wed Jun 7 17:31:36 2023 +0800
[Opt](performance) opt the outer join for nested loop join (#20524)
---
be/src/vec/exec/join/vnested_loop_join_node.cpp | 124 +++++++++++++++---------
be/src/vec/exec/join/vnested_loop_join_node.h | 82 +++++++++-------
2 files changed, 122 insertions(+), 84 deletions(-)
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index b8160b2ade..17cd2ce22a 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -97,7 +97,6 @@ private:
VNestedLoopJoinNode::VNestedLoopJoinNode(ObjectPool* pool, const TPlanNode& tnode,
const DescriptorTbl& descs)
: VJoinNodeBase(pool, tnode, descs),
- _cur_probe_row_visited_flags(false),
_matched_rows_done(false),
_left_block_pos(0),
_left_side_eos(false),
@@ -232,6 +231,9 @@ Status VNestedLoopJoinNode::sink(doris::RuntimeState* state, vectorized::Block*
Status VNestedLoopJoinNode::push(doris::RuntimeState* state, vectorized::Block* block, bool eos) {
COUNTER_UPDATE(_probe_rows_counter, block->rows());
+ _cur_probe_row_visited_flags.resize(block->rows());
+ std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 0);
+ _left_block_pos = 0;
_need_more_input_data = false;
_left_side_eos = eos;
@@ -287,22 +289,24 @@ void VNestedLoopJoinNode::_append_left_data_with_null(MutableBlock& mutable_bloc
DCHECK(_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN);
assert_cast<ColumnNullable*>(dst_columns[i].get())
->get_nested_column_ptr()
- ->insert_many_from(*src_column.column, _left_block_pos, 1);
+ ->insert_range_from(*src_column.column, _left_block_start_pos,
+ _left_side_process_count);
assert_cast<ColumnNullable*>(dst_columns[i].get())
->get_null_map_column()
.get_data()
.resize_fill(origin_sz + 1, 0);
} else {
- dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1);
+ dst_columns[i]->insert_range_from(*src_column.column, _left_block_start_pos,
+ _left_side_process_count);
}
}
for (size_t i = 0; i < _num_build_side_columns; ++i) {
- dst_columns[_num_probe_side_columns + i]->insert_default();
+ dst_columns[_num_probe_side_columns + i]->insert_many_defaults(_left_side_process_count);
}
IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
*dst_columns[dst_columns.size() - 1])
.get_data();
- mark_data.resize_fill(mark_data.size() + 1, 0);
+ mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
}
void VNestedLoopJoinNode::_process_left_child_block(MutableBlock& mutable_block,
@@ -457,54 +461,58 @@ void VNestedLoopJoinNode::_finalize_current_phase(MutableBlock& mutable_block, s
}
_output_null_idx_build_side = i;
} else {
- if constexpr (IsSemi) {
- if (!_cur_probe_row_visited_flags && !_is_mark_join) {
- return;
+ if (!_is_mark_join) {
+ auto new_size = column_size;
+ DCHECK_LE(_left_block_start_pos + _left_side_process_count, _left_block.rows());
+ for (int j = _left_block_start_pos;
+ j < _left_block_start_pos + _left_side_process_count; ++j) {
+ if (_cur_probe_row_visited_flags[j] == IsSemi) {
+ new_size++;
+ for (size_t i = 0; i < _num_probe_side_columns; ++i) {
+ const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
+ if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
+ DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
+ assert_cast<ColumnNullable*>(dst_columns[i].get())
+ ->get_nested_column_ptr()
+ ->insert_many_from(*src_column.column, j, 1);
+ assert_cast<ColumnNullable*>(dst_columns[i].get())
+ ->get_null_map_column()
+ .get_data()
+ .resize_fill(new_size, 0);
+ } else {
+ dst_columns[i]->insert_many_from(*src_column.column, j, 1);
+ }
+ }
+ }
}
- } else {
- if (_cur_probe_row_visited_flags && !_is_mark_join) {
- return;
+ if (new_size > column_size) {
+ for (size_t i = 0; i < _num_build_side_columns; ++i) {
+ dst_columns[_num_probe_side_columns + i]->insert_many_defaults(new_size -
+ column_size);
+ }
+ _resize_fill_tuple_is_null_column(new_size, 0, 1);
}
- }
-
- auto new_size = column_size + 1;
- if (_is_mark_join) {
+ } else {
IColumn::Filter& mark_data = assert_cast<doris::vectorized::ColumnVector<UInt8>&>(
*dst_columns[dst_columns.size() - 1])
.get_data();
- mark_data.resize_fill(mark_data.size() + 1,
- (IsSemi && !_cur_probe_row_visited_flags) ||
- (!IsSemi && _cur_probe_row_visited_flags)
- ? 0
- : 1);
- }
-
- DCHECK_LT(_left_block_pos, _left_block.rows());
- for (size_t i = 0; i < _num_probe_side_columns; ++i) {
- const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
- if (!src_column.column->is_nullable() && dst_columns[i]->is_nullable()) {
- DCHECK(_join_op == TJoinOp::FULL_OUTER_JOIN);
- assert_cast<ColumnNullable*>(dst_columns[i].get())
- ->get_nested_column_ptr()
- ->insert_many_from(*src_column.column, _left_block_pos, 1);
- assert_cast<ColumnNullable*>(dst_columns[i].get())
- ->get_null_map_column()
- .get_data()
- .resize_fill(new_size, 0);
- } else {
- dst_columns[i]->insert_many_from(*src_column.column, _left_block_pos, 1);
+ mark_data.reserve(mark_data.size() + _left_side_process_count);
+ DCHECK_LT(_left_block_pos, _left_block.rows());
+ for (int j = _left_block_start_pos;
+ j < _left_block_start_pos + _left_side_process_count; ++j) {
+ mark_data.emplace_back(IsSemi != _cur_probe_row_visited_flags[j]);
+ for (size_t i = 0; i < _num_probe_side_columns; ++i) {
+ const ColumnWithTypeAndName src_column = _left_block.get_by_position(i);
+ DCHECK(_join_op != TJoinOp::FULL_OUTER_JOIN);
+ dst_columns[i]->insert_from(*src_column.column, j);
+ }
}
}
- for (size_t i = 0; i < _num_build_side_columns; ++i) {
- dst_columns[_num_probe_side_columns + i]->insert_default();
- }
- _resize_fill_tuple_is_null_column(new_size, 0, 1);
}
}
void VNestedLoopJoinNode::_reset_with_next_probe_row() {
// TODO: need a vector of left block to register the _probe_row_visited_flags
- _cur_probe_row_visited_flags = false;
_current_build_pos = 0;
_left_block_pos++;
}
@@ -526,8 +534,8 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
->get_data();
auto* __restrict build_side_flag_data = build_side_flag.data();
auto cur_sz = build_side_flag.size();
- const size_t offset = _offset_stack.top();
- _offset_stack.pop();
+ const size_t offset = _build_offset_stack.top();
+ _build_offset_stack.pop();
for (size_t j = 0; j < cur_sz; j++) {
build_side_flag_data[j] |= filter[offset + j];
}
@@ -535,7 +543,20 @@ void VNestedLoopJoinNode::_do_filtering_and_update_visited_flags_impl(
}
}
if constexpr (SetProbeSideFlag) {
- _cur_probe_row_visited_flags |= simd::contain_byte<uint8>(filter.data(), filter.size(), 1);
+ int end = filter.size();
+ for (int i = _left_block_pos == _left_block.rows() ? _left_block_pos - 1 : _left_block_pos;
+ i >= _left_block_start_pos; i--) {
+ int offset = 0;
+ if (!_probe_offset_stack.empty()) {
+ offset = _probe_offset_stack.top();
+ _probe_offset_stack.pop();
+ }
+ if (!_cur_probe_row_visited_flags[i]) {
+ _cur_probe_row_visited_flags[i] =
+ simd::contain_byte<uint8>(filter.data() + offset, end - offset, 1) ? 1 : 0;
+ }
+ end = offset;
+ }
}
if (materialize) {
Block::filter_block_internal(block, filter, column_to_keep);
@@ -554,7 +575,7 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
// 3. Use bool column to do filtering.
size_t build_block_idx =
_current_build_pos == 0 ? _build_blocks.size() - 1 : _current_build_pos - 1;
- size_t processed_blocks_num = _offset_stack.size();
+ size_t processed_blocks_num = _build_offset_stack.size();
if (LIKELY(!_join_conjuncts.empty() && block->rows() > 0)) {
IColumn::Filter filter(block->rows(), 1);
bool can_filter_all = false;
@@ -563,6 +584,11 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
if (can_filter_all) {
CLEAR_BLOCK
+ std::stack<uint16_t> empty1;
+ _probe_offset_stack.swap(empty1);
+
+ std::stack<uint16_t> empty2;
+ _build_offset_stack.swap(empty2);
} else {
_do_filtering_and_update_visited_flags_impl<decltype(filter), SetBuildSideFlag,
SetProbeSideFlag>(
@@ -577,14 +603,16 @@ Status VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(Block* block,
->get_data();
auto* __restrict build_side_flag_data = build_side_flag.data();
auto cur_sz = build_side_flag.size();
- _offset_stack.pop();
+ _build_offset_stack.pop();
memset(reinterpret_cast<void*>(build_side_flag_data), 1, cur_sz);
build_block_idx =
build_block_idx == 0 ? _build_blocks.size() - 1 : build_block_idx - 1;
}
}
if constexpr (SetProbeSideFlag) {
- _cur_probe_row_visited_flags = true;
+ std::stack<uint16_t> empty;
+ _probe_offset_stack.swap(empty);
+ std::fill(_cur_probe_row_visited_flags.begin(), _cur_probe_row_visited_flags.end(), 1);
}
if (!materialize) {
CLEAR_BLOCK
@@ -676,7 +704,7 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state, vectorized::Block* block,
}
bool VNestedLoopJoinNode::need_more_input_data() const {
- return _need_more_input_data and !_left_side_eos;
+ return _need_more_input_data and !_left_side_eos and _join_block.rows() == 0;
}
void VNestedLoopJoinNode::release_resource(doris::RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h b/be/src/vec/exec/join/vnested_loop_join_node.h
index 4bd66798d9..03676629bc 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -94,31 +94,46 @@ private:
constexpr bool ignore_null = JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
JoinOpType::value == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType::value == TJoinOp::RIGHT_ANTI_JOIN;
+ _left_block_start_pos = _left_block_pos;
+ _left_side_process_count = 0;
+ DCHECK(!_need_more_input_data || !_matched_rows_done);
MutableBlock mutable_join_block(&_join_block);
+ if (!_matched_rows_done && !_need_more_input_data) {
+ // We should try to join rows if there still are some rows from probe side.
+ while (_join_block.rows() < state->batch_size()) {
+ while (_current_build_pos == _build_blocks.size() ||
+ _left_block_pos == _left_block.rows()) {
+ // if left block is empty(), do not need disprocess the left block rows
+ if (_left_block.rows() > _left_block_pos) {
+ _left_side_process_count++;
+ }
- while (_join_block.rows() < state->batch_size() && !_matched_rows_done) {
- // If this left block is exhausted or empty, we need to pull data from left child.
- if (_left_block_pos == _left_block.rows()) {
- if (_left_side_eos) {
- _matched_rows_done = true;
- } else {
- _left_block_pos = 0;
- _need_more_input_data = true;
- return Status::OK();
+ _reset_with_next_probe_row();
+ if (_left_block_pos < _left_block.rows()) {
+ if constexpr (set_probe_side_flag) {
+ _probe_offset_stack.push(mutable_join_block.rows());
+ }
+ } else {
+ if (_left_side_eos) {
+ _matched_rows_done = true;
+ } else {
+ _need_more_input_data = true;
+ }
+ break;
+ }
}
- }
- // We should try to join rows if there still are some rows from probe side.
- if (!_matched_rows_done && _current_build_pos < _build_blocks.size()) {
- do {
- const auto& now_process_build_block = _build_blocks[_current_build_pos++];
- if constexpr (set_build_side_flag) {
- _offset_stack.push(mutable_join_block.rows());
- }
- _process_left_child_block(mutable_join_block, now_process_build_block);
- } while (_join_block.rows() < state->batch_size() &&
- _current_build_pos < _build_blocks.size());
+ // Do not have left row need to be disposed
+ if (_matched_rows_done || _need_more_input_data) {
+ break;
+ }
+
+ const auto& now_process_build_block = _build_blocks[_current_build_pos++];
+ if constexpr (set_build_side_flag) {
+ _build_offset_stack.push(mutable_join_block.rows());
+ }
+ _process_left_child_block(mutable_join_block, now_process_build_block);
}
if constexpr (set_probe_side_flag) {
@@ -133,28 +148,20 @@ private:
}
mutable_join_block = MutableBlock(&_join_block);
// If this join operation is left outer join or full outer join, when
- // `_current_build_pos == _build_blocks.size()`, means all rows from build
- // side have been joined with the current probe row, we should output current
+ // `_left_side_process_count`, means all rows from build
+ // side have been joined with _left_side_process_count, we should output current
// probe row with null from build side.
- if (_current_build_pos == _build_blocks.size()) {
- if (!_matched_rows_done) {
- _finalize_current_phase<false,
- JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>(
- mutable_join_block, state->batch_size());
- _reset_with_next_probe_row();
- }
- break;
+ if (_left_side_process_count) {
+ _finalize_current_phase<false, JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN>(
+ mutable_join_block, state->batch_size());
}
}
- if (!_matched_rows_done && _current_build_pos == _build_blocks.size()) {
+ if (_left_side_process_count) {
if (_is_mark_join && _build_blocks.empty()) {
DCHECK_EQ(JoinOpType::value, TJoinOp::CROSS_JOIN);
_append_left_data_with_null(mutable_join_block);
- _reset_with_next_probe_row();
- break;
}
- _reset_with_next_probe_row();
}
}
@@ -221,7 +228,7 @@ private:
// Visited flags for each row in build side.
MutableColumns _build_side_visited_flags;
// Visited flags for current row in probe side.
- bool _cur_probe_row_visited_flags;
+ std::vector<int8_t> _cur_probe_row_visited_flags;
size_t _current_build_pos = 0;
size_t _num_probe_side_columns = 0;
@@ -238,8 +245,10 @@ private:
// is responsible for.
Block _left_block;
+ int _left_block_start_pos = 0;
int _left_block_pos; // current scan pos in _left_block
bool _left_side_eos; // if true, left child has no more rows to process
+ int _left_side_process_count = 0;
bool _old_version_flag;
@@ -249,7 +258,8 @@ private:
VExprContextSPtrs _filter_src_expr_ctxs;
bool _is_output_left_side_only = false;
bool _need_more_input_data = true;
- std::stack<uint16_t> _offset_stack;
+ std::stack<uint16_t> _build_offset_stack;
+ std::stack<uint16_t> _probe_offset_stack;
VExprContextSPtrs _join_conjuncts;
friend struct RuntimeFilterBuild;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org