You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/21 11:28:05 UTC
[doris] 02/18: [refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564)
This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 24cce35bd59607e248457214d347c28a1a755b11
Author: Mryange <59...@users.noreply.github.com>
AuthorDate: Thu Jul 20 22:53:19 2023 +0800
[refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564)
---
be/src/exec/exec_node.cpp | 9 ++------
be/src/vec/common/sort/partition_sorter.cpp | 11 +++-------
be/src/vec/common/sort/sorter.cpp | 12 ++++-------
be/src/vec/exec/vrepeat_node.cpp | 27 +++++------------------
be/src/vec/exec/vtable_function_node.cpp | 31 ++++++---------------------
be/src/vec/exec/vunion_node.cpp | 31 ++++-----------------------
be/src/vec/runtime/vsorted_run_merger.cpp | 12 ++++-------
be/src/vec/utils/util.hpp | 33 ++++++++++++++++++++++++++++-
8 files changed, 60 insertions(+), 106 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 938cbc2608..c6b7826deb 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -528,11 +528,8 @@ std::string ExecNode::get_name() {
Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
SCOPED_TIMER(_projection_timer);
using namespace vectorized;
- auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
- is_mem_reuse ? MutableBlock(output_block)
- : MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
- *_output_row_descriptor));
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
auto rows = origin_block->rows();
if (rows != 0) {
@@ -552,9 +549,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
}
}
-
- if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
- DCHECK(output_block->rows() == rows);
+ DCHECK(mutable_block.rows() == rows);
}
return Status::OK();
diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp
index ca29d62eb4..1bffb5ed76 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -94,10 +94,9 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
const auto& sorted_block = _state->get_sorted_block()[0];
size_t num_columns = sorted_block.columns();
- bool mem_reuse = output_block->mem_reuse();
- MutableColumns merged_columns =
- mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns();
-
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
+ MutableColumns& merged_columns = m_block.mutable_columns();
size_t current_output_rows = 0;
auto& priority_queue = _state->get_priority_queue();
@@ -189,10 +188,6 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
}
}
- if (!mem_reuse) {
- Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns));
- merge_block.swap(*output_block);
- }
_output_total_rows += output_block->rows();
if (current_output_rows == 0 || get_enough_data == true) {
*eos = true;
diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp
index 1254b311a0..ae8868ff2a 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -32,6 +32,7 @@
#include "runtime/thread_context.h"
#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
+#include "vec/core/block.h"
#include "vec/core/block_spill_reader.h"
#include "vec/core/block_spill_writer.h"
#include "vec/core/column_with_type_and_name.h"
@@ -39,6 +40,7 @@
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
namespace doris {
class RowDescriptor;
@@ -160,9 +162,8 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
doris::vectorized::Block* block, bool* eos) {
size_t num_columns = sorted_blocks_[0].columns();
- bool mem_reuse = block->mem_reuse();
- MutableColumns merged_columns =
- mem_reuse ? block->mutate_columns() : sorted_blocks_[0].clone_empty_columns();
+ MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
+ MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
@@ -191,11 +192,6 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
return Status::OK();
}
- if (!mem_reuse) {
- Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
- merge_block.swap(*block);
- }
-
return Status::OK();
}
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index d7499de803..58c993c8b3 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -38,11 +38,13 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
namespace doris::vectorized {
VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -102,17 +104,10 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
size_t child_column_size = child_block->columns();
size_t column_size = _output_slots.size();
- bool mem_reuse = output_block->mem_reuse();
DCHECK_LT(child_column_size, column_size);
- std::vector<vectorized::MutableColumnPtr> columns(column_size);
- for (size_t i = 0; i < column_size; i++) {
- if (mem_reuse) {
- columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
- } else {
- columns[i] = _output_slots[i]->get_empty_mutable_column();
- }
- }
-
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
+ MutableColumns& columns = m_block.mutable_columns();
/* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
* insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
* slot_id_set_list=[[0],[1]],repeat_id_idx=0,
@@ -173,18 +168,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
DCHECK_EQ(cur_col, column_size);
- if (!columns.empty() && !columns[0]->empty()) {
- auto n_columns = 0;
- if (!mem_reuse) {
- for (const auto slot_desc : _output_slots) {
- output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
- }
- } else {
- columns.clear();
- }
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp
index d3e967afcc..81fdef4389 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -29,10 +29,13 @@
#include <string>
#include <utility>
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
#include "vec/exprs/table_function/table_function.h"
#include "vec/exprs/table_function/table_function_factory.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
namespace doris {
class ObjectPool;
@@ -153,18 +156,9 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block,
bool* eos) {
- size_t column_size = _output_slots.size();
- bool mem_reuse = output_block->mem_reuse();
-
- std::vector<MutableColumnPtr> columns(column_size);
- for (size_t i = 0; i < column_size; i++) {
- if (mem_reuse) {
- columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
- } else {
- columns[i] = _output_slots[i]->get_empty_mutable_column();
- }
- }
-
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
+ MutableColumns& columns = m_block.mutable_columns();
for (int i = 0; i < _fn_num; i++) {
if (columns[i + _child_slots.size()]->is_nullable()) {
_fns[i]->set_nullable();
@@ -222,19 +216,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
columns[index]->insert_many_defaults(row_size - columns[index]->size());
}
- if (!columns.empty() && !columns[0]->empty()) {
- auto n_columns = 0;
- if (!mem_reuse) {
- for (const auto slot_desc : _output_slots) {
- output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
- slot_desc->get_data_type_ptr(),
- slot_desc->col_name()));
- }
- } else {
- columns.clear();
- }
- }
-
// 3. eval conjuncts
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 7c079a3b46..e9caa494fd 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -151,11 +151,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
DCHECK(!reached_limit());
DCHECK_LT(_child_idx, _children.size());
- bool mem_reuse = block->mem_reuse();
- MutableBlock mblock =
- mem_reuse ? MutableBlock::build_mutable_block(block)
- : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
- _row_descriptor)));
+ MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
Block child_block;
while (has_more_materialized() && mblock.rows() <= state->batch_size()) {
@@ -202,10 +198,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
}
}
- if (!mem_reuse) {
- block->swap(mblock.to_block());
- }
-
DCHECK_LE(_child_idx, _children.size());
return Status::OK();
}
@@ -214,11 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
DCHECK_EQ(state->per_fragment_instance_idx(), 0);
DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size());
- bool mem_reuse = block->mem_reuse();
- MutableBlock mblock =
- mem_reuse ? MutableBlock::build_mutable_block(block)
- : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
- _row_descriptor)));
+ MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size();
++_const_expr_list_idx) {
Block tmp_block;
@@ -237,10 +225,6 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
}
}
- if (!mem_reuse) {
- block->swap(mblock.to_block());
- }
-
// some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);"
// the const expr will be in output expr cause the union node return a empty block. so here we
// need add one row to make sure the union node exec const expr return at least one row
@@ -257,19 +241,12 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id,
vectorized::Block* output_block) {
DCHECK_LT(child_id, _children.size());
DCHECK(!is_child_passthrough(child_id));
- bool mem_reuse = output_block->mem_reuse();
- MutableBlock mblock =
- mem_reuse ? MutableBlock::build_mutable_block(output_block)
- : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
- _row_descriptor)));
-
if (input_block->rows() > 0) {
+ MutableBlock mblock =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, _row_descriptor);
Block res;
RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
RETURN_IF_ERROR(mblock.merge(res));
- if (!mem_reuse) {
- output_block->swap(mblock.to_block());
- }
}
return Status::OK();
}
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp
index be7397ee0a..c45004081b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -24,6 +24,7 @@
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
#include "vec/core/column_with_type_and_name.h"
+#include "vec/utils/util.hpp"
namespace doris {
namespace vectorized {
@@ -129,9 +130,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
}
} else {
size_t num_columns = _empty_block.columns();
- bool mem_reuse = output_block->mem_reuse();
- MutableColumns merged_columns =
- mem_reuse ? output_block->mutate_columns() : _empty_block.clone_empty_columns();
+ MutableBlock m_block =
+ VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block);
+ MutableColumns& merged_columns = m_block.mutable_columns();
/// Take rows from queue in right order and push to 'merged'.
size_t merged_rows = 0;
@@ -154,11 +155,6 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
*eos = true;
return Status::OK();
}
-
- if (!mem_reuse) {
- Block merge_block = _empty_block.clone_with_columns(std::move(merged_columns));
- merge_block.swap(*output_block);
- }
}
_num_rows_returned += output_block->rows();
diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp
index ba593c60cb..a57e5c1705 100644
--- a/be/src/vec/utils/util.hpp
+++ b/be/src/vec/utils/util.hpp
@@ -22,6 +22,7 @@
#include <boost/shared_ptr.hpp>
#include "runtime/descriptors.h"
+#include "vec/columns/column.h"
#include "vec/columns/column_nullable.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr.h"
@@ -34,7 +35,37 @@ public:
// Block block;
return create_columns_with_type_and_name(row_desc);
}
-
+ static MutableBlock build_mutable_mem_reuse_block(Block* block, const RowDescriptor& row_desc) {
+ if (!block->mem_reuse()) {
+ MutableBlock tmp(VectorizedUtils::create_columns_with_type_and_name(row_desc));
+ block->swap(tmp.to_block());
+ }
+ return MutableBlock::build_mutable_block(block);
+ }
+ static MutableBlock build_mutable_mem_reuse_block(Block* block, const Block& other) {
+ if (!block->mem_reuse()) {
+ MutableBlock tmp(other.clone_empty());
+ block->swap(tmp.to_block());
+ }
+ return MutableBlock::build_mutable_block(block);
+ }
+ static MutableBlock build_mutable_mem_reuse_block(Block* block,
+ std::vector<SlotDescriptor*>& slots) {
+ if (!block->mem_reuse()) {
+ size_t column_size = slots.size();
+ MutableColumns columns(column_size);
+ for (size_t i = 0; i < column_size; i++) {
+ columns[i] = slots[i]->get_empty_mutable_column();
+ }
+ int n_columns = 0;
+ for (const auto slot_desc : slots) {
+ block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
+ slot_desc->get_data_type_ptr(),
+ slot_desc->col_name()));
+ }
+ }
+ return MutableBlock(block);
+ }
static ColumnsWithTypeAndName create_columns_with_type_and_name(
const RowDescriptor& row_desc, bool ignore_trivial_slot = true) {
ColumnsWithTypeAndName columns_with_type_and_name;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org