You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/07/21 11:28:05 UTC

[doris] 02/18: [refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 24cce35bd59607e248457214d347c28a1a755b11
Author: Mryange <59...@users.noreply.github.com>
AuthorDate: Thu Jul 20 22:53:19 2023 +0800

    [refactor](mem_reuse) refactor mem_reuse in MutableBlock (#21564)
---
 be/src/exec/exec_node.cpp                   |  9 ++------
 be/src/vec/common/sort/partition_sorter.cpp | 11 +++-------
 be/src/vec/common/sort/sorter.cpp           | 12 ++++-------
 be/src/vec/exec/vrepeat_node.cpp            | 27 +++++------------------
 be/src/vec/exec/vtable_function_node.cpp    | 31 ++++++---------------------
 be/src/vec/exec/vunion_node.cpp             | 31 ++++-----------------------
 be/src/vec/runtime/vsorted_run_merger.cpp   | 12 ++++-------
 be/src/vec/utils/util.hpp                   | 33 ++++++++++++++++++++++++++++-
 8 files changed, 60 insertions(+), 106 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 938cbc2608..c6b7826deb 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -528,11 +528,8 @@ std::string ExecNode::get_name() {
 Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Block* output_block) {
     SCOPED_TIMER(_projection_timer);
     using namespace vectorized;
-    auto is_mem_reuse = output_block->mem_reuse();
     MutableBlock mutable_block =
-            is_mem_reuse ? MutableBlock(output_block)
-                         : MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
-                                   *_output_row_descriptor));
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, *_output_row_descriptor);
     auto rows = origin_block->rows();
 
     if (rows != 0) {
@@ -552,9 +549,7 @@ Status ExecNode::do_projections(vectorized::Block* origin_block, vectorized::Blo
                 mutable_columns[i]->insert_range_from(*column_ptr, 0, rows);
             }
         }
-
-        if (!is_mem_reuse) output_block->swap(mutable_block.to_block());
-        DCHECK(output_block->rows() == rows);
+        DCHECK(mutable_block.rows() == rows);
     }
 
     return Status::OK();
diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp
index ca29d62eb4..1bffb5ed76 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -94,10 +94,9 @@ Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
 Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
     const auto& sorted_block = _state->get_sorted_block()[0];
     size_t num_columns = sorted_block.columns();
-    bool mem_reuse = output_block->mem_reuse();
-    MutableColumns merged_columns =
-            mem_reuse ? output_block->mutate_columns() : sorted_block.clone_empty_columns();
-
+    MutableBlock m_block =
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
+    MutableColumns& merged_columns = m_block.mutable_columns();
     size_t current_output_rows = 0;
     auto& priority_queue = _state->get_priority_queue();
 
@@ -189,10 +188,6 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
         }
     }
 
-    if (!mem_reuse) {
-        Block merge_block = sorted_block.clone_with_columns(std::move(merged_columns));
-        merge_block.swap(*output_block);
-    }
     _output_total_rows += output_block->rows();
     if (current_output_rows == 0 || get_enough_data == true) {
         *eos = true;
diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp
index 1254b311a0..ae8868ff2a 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -32,6 +32,7 @@
 #include "runtime/thread_context.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/core/block.h"
 #include "vec/core/block_spill_reader.h"
 #include "vec/core/block_spill_writer.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -39,6 +40,7 @@
 #include "vec/data_types/data_type.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
 
 namespace doris {
 class RowDescriptor;
@@ -160,9 +162,8 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
                                                       doris::vectorized::Block* block, bool* eos) {
     size_t num_columns = sorted_blocks_[0].columns();
 
-    bool mem_reuse = block->mem_reuse();
-    MutableColumns merged_columns =
-            mem_reuse ? block->mutate_columns() : sorted_blocks_[0].clone_empty_columns();
+    MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
+    MutableColumns& merged_columns = m_block.mutable_columns();
 
     /// Take rows from queue in right order and push to 'merged'.
     size_t merged_rows = 0;
@@ -191,11 +192,6 @@ Status MergeSorterState::_merge_sort_read_not_spilled(int batch_size,
         return Status::OK();
     }
 
-    if (!mem_reuse) {
-        Block merge_block = sorted_blocks_[0].clone_with_columns(std::move(merged_columns));
-        merge_block.swap(*block);
-    }
-
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vrepeat_node.cpp b/be/src/vec/exec/vrepeat_node.cpp
index d7499de803..58c993c8b3 100644
--- a/be/src/vec/exec/vrepeat_node.cpp
+++ b/be/src/vec/exec/vrepeat_node.cpp
@@ -38,11 +38,13 @@
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
 
 namespace doris::vectorized {
 VRepeatNode::VRepeatNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -102,17 +104,10 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
 
     size_t child_column_size = child_block->columns();
     size_t column_size = _output_slots.size();
-    bool mem_reuse = output_block->mem_reuse();
     DCHECK_LT(child_column_size, column_size);
-    std::vector<vectorized::MutableColumnPtr> columns(column_size);
-    for (size_t i = 0; i < column_size; i++) {
-        if (mem_reuse) {
-            columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
-        } else {
-            columns[i] = _output_slots[i]->get_empty_mutable_column();
-        }
-    }
-
+    MutableBlock m_block =
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
+    MutableColumns& columns = m_block.mutable_columns();
     /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
      * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
      * slot_id_set_list=[[0],[1]],repeat_id_idx=0,
@@ -173,18 +168,6 @@ Status VRepeatNode::get_repeated_block(Block* child_block, int repeat_id_idx, Bl
 
     DCHECK_EQ(cur_col, column_size);
 
-    if (!columns.empty() && !columns[0]->empty()) {
-        auto n_columns = 0;
-        if (!mem_reuse) {
-            for (const auto slot_desc : _output_slots) {
-                output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                           slot_desc->get_data_type_ptr(),
-                                                           slot_desc->col_name()));
-            }
-        } else {
-            columns.clear();
-        }
-    }
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vtable_function_node.cpp b/be/src/vec/exec/vtable_function_node.cpp
index d3e967afcc..81fdef4389 100644
--- a/be/src/vec/exec/vtable_function_node.cpp
+++ b/be/src/vec/exec/vtable_function_node.cpp
@@ -29,10 +29,13 @@
 #include <string>
 #include <utility>
 
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
 #include "vec/exprs/table_function/table_function.h"
 #include "vec/exprs/table_function/table_function_factory.h"
 #include "vec/exprs/vexpr.h"
 #include "vec/exprs/vexpr_context.h"
+#include "vec/utils/util.hpp"
 
 namespace doris {
 class ObjectPool;
@@ -153,18 +156,9 @@ Status VTableFunctionNode::get_next(RuntimeState* state, Block* block, bool* eos
 
 Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* output_block,
                                                bool* eos) {
-    size_t column_size = _output_slots.size();
-    bool mem_reuse = output_block->mem_reuse();
-
-    std::vector<MutableColumnPtr> columns(column_size);
-    for (size_t i = 0; i < column_size; i++) {
-        if (mem_reuse) {
-            columns[i] = std::move(*output_block->get_by_position(i).column).mutate();
-        } else {
-            columns[i] = _output_slots[i]->get_empty_mutable_column();
-        }
-    }
-
+    MutableBlock m_block =
+            VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
+    MutableColumns& columns = m_block.mutable_columns();
     for (int i = 0; i < _fn_num; i++) {
         if (columns[i + _child_slots.size()]->is_nullable()) {
             _fns[i]->set_nullable();
@@ -222,19 +216,6 @@ Status VTableFunctionNode::_get_expanded_block(RuntimeState* state, Block* outpu
         columns[index]->insert_many_defaults(row_size - columns[index]->size());
     }
 
-    if (!columns.empty() && !columns[0]->empty()) {
-        auto n_columns = 0;
-        if (!mem_reuse) {
-            for (const auto slot_desc : _output_slots) {
-                output_block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
-                                                           slot_desc->get_data_type_ptr(),
-                                                           slot_desc->col_name()));
-            }
-        } else {
-            columns.clear();
-        }
-    }
-
     // 3. eval conjuncts
     RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, output_block, output_block->columns()));
 
diff --git a/be/src/vec/exec/vunion_node.cpp b/be/src/vec/exec/vunion_node.cpp
index 7c079a3b46..e9caa494fd 100644
--- a/be/src/vec/exec/vunion_node.cpp
+++ b/be/src/vec/exec/vunion_node.cpp
@@ -151,11 +151,7 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
     DCHECK(!reached_limit());
     DCHECK_LT(_child_idx, _children.size());
 
-    bool mem_reuse = block->mem_reuse();
-    MutableBlock mblock =
-            mem_reuse ? MutableBlock::build_mutable_block(block)
-                      : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
-                                _row_descriptor)));
+    MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
 
     Block child_block;
     while (has_more_materialized() && mblock.rows() <= state->batch_size()) {
@@ -202,10 +198,6 @@ Status VUnionNode::get_next_materialized(RuntimeState* state, Block* block) {
         }
     }
 
-    if (!mem_reuse) {
-        block->swap(mblock.to_block());
-    }
-
     DCHECK_LE(_child_idx, _children.size());
     return Status::OK();
 }
@@ -214,11 +206,7 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
     DCHECK_EQ(state->per_fragment_instance_idx(), 0);
     DCHECK_LT(_const_expr_list_idx, _const_expr_lists.size());
 
-    bool mem_reuse = block->mem_reuse();
-    MutableBlock mblock =
-            mem_reuse ? MutableBlock::build_mutable_block(block)
-                      : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
-                                _row_descriptor)));
+    MutableBlock mblock = VectorizedUtils::build_mutable_mem_reuse_block(block, _row_descriptor);
     for (; _const_expr_list_idx < _const_expr_lists.size() && mblock.rows() <= state->batch_size();
          ++_const_expr_list_idx) {
         Block tmp_block;
@@ -237,10 +225,6 @@ Status VUnionNode::get_next_const(RuntimeState* state, Block* block) {
         }
     }
 
-    if (!mem_reuse) {
-        block->swap(mblock.to_block());
-    }
-
     // some insert query like "insert into string_test select 1, repeat('a', 1024 * 1024);"
     // the const expr will be in output expr cause the union node return a empty block. so here we
     // need add one row to make sure the union node exec const expr return at least one row
@@ -257,19 +241,12 @@ Status VUnionNode::materialize_child_block(RuntimeState* state, int child_id,
                                            vectorized::Block* output_block) {
     DCHECK_LT(child_id, _children.size());
     DCHECK(!is_child_passthrough(child_id));
-    bool mem_reuse = output_block->mem_reuse();
-    MutableBlock mblock =
-            mem_reuse ? MutableBlock::build_mutable_block(output_block)
-                      : MutableBlock(Block(VectorizedUtils::create_columns_with_type_and_name(
-                                _row_descriptor)));
-
     if (input_block->rows() > 0) {
+        MutableBlock mblock =
+                VectorizedUtils::build_mutable_mem_reuse_block(output_block, _row_descriptor);
         Block res;
         RETURN_IF_ERROR(materialize_block(input_block, child_id, &res));
         RETURN_IF_ERROR(mblock.merge(res));
-        if (!mem_reuse) {
-            output_block->swap(mblock.to_block());
-        }
     }
     return Status::OK();
 }
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp
index be7397ee0a..c45004081b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -24,6 +24,7 @@
 #include "util/stopwatch.hpp"
 #include "vec/columns/column.h"
 #include "vec/core/column_with_type_and_name.h"
+#include "vec/utils/util.hpp"
 
 namespace doris {
 namespace vectorized {
@@ -129,9 +130,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
         }
     } else {
         size_t num_columns = _empty_block.columns();
-        bool mem_reuse = output_block->mem_reuse();
-        MutableColumns merged_columns =
-                mem_reuse ? output_block->mutate_columns() : _empty_block.clone_empty_columns();
+        MutableBlock m_block =
+                VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block);
+        MutableColumns& merged_columns = m_block.mutable_columns();
 
         /// Take rows from queue in right order and push to 'merged'.
         size_t merged_rows = 0;
@@ -154,11 +155,6 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) {
             *eos = true;
             return Status::OK();
         }
-
-        if (!mem_reuse) {
-            Block merge_block = _empty_block.clone_with_columns(std::move(merged_columns));
-            merge_block.swap(*output_block);
-        }
     }
 
     _num_rows_returned += output_block->rows();
diff --git a/be/src/vec/utils/util.hpp b/be/src/vec/utils/util.hpp
index ba593c60cb..a57e5c1705 100644
--- a/be/src/vec/utils/util.hpp
+++ b/be/src/vec/utils/util.hpp
@@ -22,6 +22,7 @@
 #include <boost/shared_ptr.hpp>
 
 #include "runtime/descriptors.h"
+#include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
@@ -34,7 +35,37 @@ public:
         // Block block;
         return create_columns_with_type_and_name(row_desc);
     }
-
+    static MutableBlock build_mutable_mem_reuse_block(Block* block, const RowDescriptor& row_desc) {
+        if (!block->mem_reuse()) {
+            MutableBlock tmp(VectorizedUtils::create_columns_with_type_and_name(row_desc));
+            block->swap(tmp.to_block());
+        }
+        return MutableBlock::build_mutable_block(block);
+    }
+    static MutableBlock build_mutable_mem_reuse_block(Block* block, const Block& other) {
+        if (!block->mem_reuse()) {
+            MutableBlock tmp(other.clone_empty());
+            block->swap(tmp.to_block());
+        }
+        return MutableBlock::build_mutable_block(block);
+    }
+    static MutableBlock build_mutable_mem_reuse_block(Block* block,
+                                                      std::vector<SlotDescriptor*>& slots) {
+        if (!block->mem_reuse()) {
+            size_t column_size = slots.size();
+            MutableColumns columns(column_size);
+            for (size_t i = 0; i < column_size; i++) {
+                columns[i] = slots[i]->get_empty_mutable_column();
+            }
+            int n_columns = 0;
+            for (const auto slot_desc : slots) {
+                block->insert(ColumnWithTypeAndName(std::move(columns[n_columns++]),
+                                                    slot_desc->get_data_type_ptr(),
+                                                    slot_desc->col_name()));
+            }
+        }
+        return MutableBlock(block);
+    }
     static ColumnsWithTypeAndName create_columns_with_type_and_name(
             const RowDescriptor& row_desc, bool ignore_trivial_slot = true) {
         ColumnsWithTypeAndName columns_with_type_and_name;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org