You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by we...@apache.org on 2022/06/21 12:55:34 UTC

[doris] 01/05: [feature-wip](stream-load-vec) opt memtable

This is an automated email from the ASF dual-hosted git repository.

weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0d3880b41586fe8f159a3c8ab8a395f65cfe0ef7
Author: weixiang <we...@meituan.com>
AuthorDate: Mon Apr 18 21:37:34 2022 +0800

    [feature-wip](stream-load-vec) opt memtable
---
 be/src/olap/delta_writer.cpp                       |   7 +-
 be/src/olap/memtable.cpp                           | 147 +++++++++++++++--
 be/src/olap/memtable.h                             |  43 ++++-
 be/src/vec/CMakeLists.txt                          |   1 +
 .../vec/aggregate_functions/block_aggregator.cpp   | 180 +++++++++++++++++++++
 be/src/vec/aggregate_functions/block_aggregator.h  |  70 ++++++++
 be/src/vec/core/block.cpp                          |  20 +++
 be/src/vec/core/block.h                            |  13 ++
 8 files changed, 461 insertions(+), 20 deletions(-)

diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 11ee242449..5cf0c9cd22 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -191,11 +191,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
     }
 
     int start = 0, end = 0;
+    bool flush = false;
     const size_t num_rows = row_idxs.size();
     for (; start < num_rows;) {
         auto count = end + 1 - start;
         if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
-            _mem_table->insert(block, row_idxs[start], count);
+            if (_mem_table->insert(block, row_idxs[start], count)) {
+                flush = true;
+            }
             start += count;
             end = start;
         } else {
@@ -203,7 +206,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         }
     }
 
-    if (_mem_table->memory_usage() >= config::write_buffer_size) {
+    if (flush || _mem_table->is_full()) {
         RETURN_NOT_OK(_flush_memtable_async());
         _reset_mem_table();
     }
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c7f94ead4f..e1c97f2d4d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -53,6 +53,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
         // TODO: Support ZOrderComparator in the future
         _vec_skip_list = std::make_unique<VecTable>(
                 _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
+        _block_aggregator =
+                std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true);
     } else {
         _vec_skip_list = nullptr;
         if (_keys_type == KeysType::DUP_KEYS) {
@@ -114,27 +116,135 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
                                *_pblock, -1);
 }
 
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
     if (_is_first_insertion) {
         _is_first_insertion = false;
         auto cloneBlock = block->clone_without_columns();
-        _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        _vec_row_comparator->set_block(&_input_mutable_block);
-        _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+        _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
         if (_keys_type != KeysType::DUP_KEYS) {
             _init_agg_functions(block);
         }
     }
-    size_t cursor_in_mutableblock = _input_mutable_block.rows();
-    size_t oldsize = _input_mutable_block.allocated_bytes();
-    _input_mutable_block.add_rows(block, row_pos, num_rows);
-    size_t newsize = _input_mutable_block.allocated_bytes();
-    _mem_usage += newsize - oldsize;
-    _mem_tracker->consume(newsize - oldsize);
-
-    for (int i = 0; i < num_rows; i++) {
-        _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
-        _insert_one_row_from_block(_row_in_blocks.back());
+    _block->add_rows(block, row_pos, num_rows);
+    _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
+    // Memtalbe is full, do not flush immediately
+    // First try to merge these blocks
+    // If the merged memtable is still full or we can not benefit a lot from merge at first
+    // Then flush the memtable into disk.
+    bool is_flush = false;
+    if (is_full()) {
+        size_t before_merge_bytes = bytes_allocated();
+        _merge();
+        size_t after_merged_bytes = bytes_allocated();
+        // TODO(weixiang): magic number here, make it configurable later.
+        if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
+            is_flush = true;
+        }
+    }
+    return is_flush;
+}
+
+size_t MemTable::bytes_allocated() const {
+    return _block_bytes_usage + _block_aggregator->get_bytes_usage();
+}
+
+bool MemTable::is_full() const {
+    return bytes_allocated() > config::write_buffer_size;
+}
+
+void MemTable::_merge() {
+    if (_block == nullptr || _keys_type == KeysType::DUP_KEYS) {
+        return;
+    }
+    _sort(false);
+    _agg(false);
+    _merge_count++;
+}
+
+void MemTable::_agg(const bool finalize) {
+    // note that the _block had been sorted before.
+    if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
+        return;
+    }
+    vectorized::Block sorted_block = _sorted_block->to_block();
+    _block_aggregator->append_block(&sorted_block);
+    _block_aggregator->partial_sort_merged_aggregate();
+    if (finalize) {
+        _sorted_block.reset();
+    } else {
+        _sorted_block->clear_column_data();
+    }
+}
+
+void MemTable::_sort(const bool finalize) {
+    _index_for_sort.resize(_block->rows());
+    for (uint32_t i = 0; i < _block->rows(); i++) {
+        _index_for_sort[i] = {i, i};
+    }
+
+    _sort_block_by_rows();
+    _sorted_block = _block->create_same_struct_block(0);
+    _append_sorted_block(_block.get(), _sorted_block.get());
+    if (finalize) {
+        _block.reset();
+    } else {
+        _block->clear_column_data();
+    }
+    _block_bytes_usage = 0;
+}
+
+void MemTable::_sort_block_by_rows() {
+    std::sort(_index_for_sort.begin(), _index_for_sort.end(),
+              [this](const MemTable::OrderedIndexItem& left,
+                     const MemTable::OrderedIndexItem& right) {
+                  int res = _block->compare_at(left.index_in_block, right.index_in_block,
+                                               _schema->num_key_columns(), *_block.get(), -1);
+                  if (res != 0) {
+                      return res < 0;
+                  }
+                  return left.incoming_index < right.incoming_index;
+              });
+}
+
+void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) {
+    size_t row_num = src->rows();
+    _sorted_index_in_block.clear();
+    _sorted_index_in_block.reserve(row_num);
+    for (size_t i = 0; i < row_num; i++) {
+        _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block);
+    }
+    vectorized::Block src_block = src->to_block();
+    dst->add_rows(&src_block, _sorted_index_in_block.data(),
+                  _sorted_index_in_block.data() + row_num);
+}
+
+void MemTable::finalize() {
+    //TODO(weixiang): check here
+    if (_block == nullptr) {
+        return;
+    }
+
+    if (_keys_type != KeysType::DUP_KEYS) {
+        // agg mode
+        if (_block->rows() > 0) {
+            _merge();
+        }
+        if (_merge_count > 1) {
+            _block = _block_aggregator->get_partial_agged_block();
+            _block_aggregator->reset_aggregator();
+            _sort(true);
+            _agg(true);
+        } else {
+            _block.reset();
+            _sorted_block.reset();
+        }
+
+        _block_bytes_usage = 0;
+        _sorted_block = _block_aggregator->get_partial_agged_block();
+
+    } else {
+        // dup mode
+        _sort(true);
     }
 }
 
@@ -271,6 +381,13 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
 }
 
 Status MemTable::flush() {
+    if (!_skip_list) {
+        finalize();
+        if (_sorted_block == nullptr) {
+            return Status::OK();
+        }
+    }
+
     VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
                   << ", memsize: " << memory_usage() << ", rows: " << _rows;
     int64_t duration_ns = 0;
@@ -301,7 +418,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
             RETURN_NOT_OK(st);
         }
     } else {
-        vectorized::Block block = _collect_vskiplist_results();
+        vectorized::Block block = _sorted_block->to_block();
         // beta rowset flush parallel, segment write add block is not
         // thread safe, so use tmp variable segment_write instead of
         // member variable
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 910cd92270..daca1f71fc 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -27,6 +27,7 @@
 #include "vec/aggregate_functions/aggregate_function.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/block.h"
+#include "vec/aggregate_functions/block_aggregator.h"
 
 namespace doris {
 
@@ -48,11 +49,17 @@ public:
 
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
+
     std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
 
-    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
-    // insert tuple from (row_pos) to (row_pos+num_rows)
-    void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); };
+    //insert tuple from (row_pos) to (row_pos+num_rows)
+    bool insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+
+    bool is_full() const;
+    size_t bytes_allocated() const;
+
+    void finalize();
 
     /// Flush
     Status flush();
@@ -146,6 +153,14 @@ private:
     // for vectorized
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
+    void _sort(const bool finalize);
+    void _sort_block_by_rows();
+
+    void _merge();
+
+    void _agg(const bool finalize);
+
+    void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst);
 
     int64_t _tablet_id;
     Schema* _schema;
@@ -195,13 +210,35 @@ private:
     //for vectorized
     vectorized::MutableBlock _input_mutable_block;
     vectorized::MutableBlock _output_mutable_block;
+
+    struct OrderedIndexItem {
+        uint32_t index_in_block;
+        uint32_t incoming_index; // used for sort by column
+    };
+
+    using OrderedIndex = std::vector<OrderedIndexItem>;
+
+    OrderedIndex _index_for_sort;
+
+    std::vector<int> _sorted_index_in_block;
+
+    vectorized::MutableBlockPtr _block;
+
+    vectorized::MutableBlockPtr _sorted_block;
+
+    std::unique_ptr<vectorized::BlockAggregator> _block_aggregator;
+
     vectorized::Block _collect_vskiplist_results();
+
     bool _is_first_insertion;
 
     void _init_agg_functions(const vectorized::Block* block);
     std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
     std::vector<RowInBlock*> _row_in_blocks;
     size_t _mem_usage;
+    size_t _block_bytes_usage = 0;
+    size_t _agg_bytes_usage = 0;
+    int _merge_count = 0;
 }; // class MemTable
 
 inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 265d6fd884..ca768bbe3d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -39,6 +39,7 @@ set(VEC_FILES
   aggregate_functions/aggregate_function_group_concat.cpp
   aggregate_functions/aggregate_function_percentile_approx.cpp
   aggregate_functions/aggregate_function_simple_factory.cpp
+  aggregate_functions/block_aggregator.cpp
   columns/collator.cpp
   columns/column.cpp
   columns/column_array.cpp
diff --git a/be/src/vec/aggregate_functions/block_aggregator.cpp b/be/src/vec/aggregate_functions/block_aggregator.cpp
new file mode 100644
index 0000000000..201cca75e0
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.cpp
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "block_aggregator.h"
+
+namespace doris::vectorized {
+
+BlockAggregator::BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted)
+        : _schema(schema), _tablet_schema(tablet_schema), _src_sorted(src_sorted) {
+    _init_agg_functions();
+}
+
+BlockAggregator::~BlockAggregator() {
+}
+
+void BlockAggregator::_init_agg_functions() {
+    _cols_num = _schema->num_columns();
+    _key_cols_num = _schema->num_key_columns();
+    _value_cols_num = _cols_num - _key_cols_num;
+    //TODO(weixiang): save memory just use value length.
+    _agg_functions.resize(_schema->num_columns());
+    _agg_places.resize(_value_cols_num);
+    for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
+        FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation();
+        std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_LOAD_SUFFIX;
+
+        std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
+                       [](unsigned char c) { return std::tolower(c); });
+
+        // create aggregate function
+        DataTypes argument_types;
+        // TODO(weixiang): 检查这块这么写是否有隐患
+        DataTypePtr dtptr = Schema::get_data_type_ptr(*_schema->column(cid));
+        argument_types.push_back(dtptr);
+        Array params;
+        AggregateFunctionPtr function =
+                AggregateFunctionSimpleFactory::instance().get(
+                        agg_name, argument_types, params, dtptr->is_nullable());
+
+        DCHECK(function != nullptr);
+        _agg_functions[cid] = function;
+    }
+}
+
+
+void BlockAggregator::append_block(Block* block) {
+    if (block == nullptr || block->rows() <= 0){
+        return;
+    }
+    _agg_data_counters.reserve(_agg_data_counters.size() + block->rows());
+    size_t key_num = _schema->num_key_columns();
+
+    size_t same_rows = 1;
+    for (size_t i = 0; i < block->rows(); i++) {
+        if ( i+1 == block->rows() || block->compare_at(i, i+1, key_num, *block, -1) != 0) {
+            _agg_data_counters.push_back(same_rows);
+            same_rows = 0;
+        }
+        same_rows++;
+    }
+    if (_is_first_append) {
+        // this means it is appending block for the first time
+        _aggregated_block = std::make_shared<MutableBlock>(block);
+        _is_first_append = false;
+    } else {
+        _aggregated_block->add_rows(block, 0, block->rows());
+    }
+}
+
+/**
+ * @brief aggregate sorted block
+ * 1. _agg_data_counters save the following N rows to agg in partial sort block
+ * 2. first_row_idx records the first row num of rows with the same keys.
+ *
+ * 
+ * TODO(weixiang):
+ *  1. refactor function partial_sort_merged_aggregate, 拆成多个函数:init等
+ */
+
+void BlockAggregator::partial_sort_merged_aggregate() {
+    DCHECK(!_agg_data_counters.empty());
+    std::vector<int> first_row_idx; // TODO(weixiang): add into member variables
+    std::vector<MutableColumnPtr> aggregated_cols;
+    first_row_idx.reserve(_agg_data_counters.size());
+    int row_pos = _cumulative_agg_num;
+    for (size_t i = 0; i < _agg_data_counters.size(); i++) {
+        first_row_idx.push_back(row_pos);
+        row_pos += _agg_data_counters[i];    
+    }
+    auto col_ids = _schema->column_ids();
+    size_t agged_row_num = first_row_idx.size();
+    // for keys:
+    for (size_t cid = 0; cid < _key_cols_num; cid++) {
+        
+        MutableColumnPtr key_col =
+                _schema->get_data_type_ptr(*_schema->column(col_ids[cid]))->create_column();
+        key_col->insert_indices_from(*_aggregated_block->mutable_columns()[cid],
+                                     first_row_idx.data(),
+                                     first_row_idx.data() + agged_row_num);
+        aggregated_cols.emplace_back(std::move(key_col));
+    }
+
+    // init agged place for values:
+    for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+        size_t place_size = _agg_functions[cid]->size_of_data();
+        _agg_places[cid - _key_cols_num] = new char[place_size * agged_row_num];
+        for (auto i = 0; i < agged_row_num; i++) {
+           AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+           _agg_functions[cid]->create(place);
+        }
+        
+    }
+    
+    // do agg
+    for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+        size_t place_size = _agg_functions[cid]->size_of_data();
+        auto* src_value_col_ptr = _aggregated_block->mutable_columns()[cid].get();
+        size_t agg_begin_idx = 0;
+        
+        for (size_t i = 0; i < agged_row_num; i++) {
+            AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+            _agg_functions[cid]->add_batch_range(
+                    agg_begin_idx,
+                    agg_begin_idx + _agg_data_counters[i] - 1, place,
+                    const_cast<const doris::vectorized::IColumn**>(&src_value_col_ptr), nullptr);
+            agg_begin_idx += _agg_data_counters[i];
+        }
+    }
+
+    // move to result column
+    for (size_t value_col_idx = 0; value_col_idx < _value_cols_num; value_col_idx++) {
+        size_t place_size = _agg_functions[value_col_idx + _key_cols_num]->size_of_data();
+        MutableColumnPtr dst_value_col_ptr =
+                _schema->get_data_type_ptr(*_schema->column(col_ids[value_col_idx + _key_cols_num]))
+                        ->create_column();
+        for (size_t i = 0; i < first_row_idx.size(); i++) {
+            _agg_functions[value_col_idx + _key_cols_num]->insert_result_into(
+                    _agg_places[value_col_idx] + i * place_size,
+                    *reinterpret_cast<doris::vectorized::IColumn*>(dst_value_col_ptr.get()));
+        }
+        aggregated_cols.emplace_back(std::move(dst_value_col_ptr));
+    }
+
+    _aggregated_block->clear_column_data();
+    _aggregated_block->append_from_columns(aggregated_cols, agged_row_num);
+    _agg_data_counters.clear();
+    _cumulative_agg_num += agged_row_num;
+
+    for(auto place : _agg_places) {
+        // free aggregated memory
+        delete[] place;
+    }
+    
+
+}
+
+
+
+size_t BlockAggregator::get_bytes_usage() const{
+    if(UNLIKELY(_aggregated_block == nullptr)) {
+        return 0;
+    }
+    return _aggregated_block->allocated_bytes();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/block_aggregator.h b/be/src/vec/aggregate_functions/block_aggregator.h
new file mode 100644
index 0000000000..510e614dd2
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#pragma once
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/core/block.h"
+#include "olap/schema.h"
+
+namespace doris::vectorized {
+
+using BlockPtr = std::shared_ptr<Block>;
+using MutableBlockPtr = std::shared_ptr<MutableBlock>;
+
+class BlockAggregator {
+
+
+public:
+    BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted);
+    ~BlockAggregator();
+    void append_block(Block* block);
+    void partial_sort_merged_aggregate();
+    void _init_agg_functions();
+    size_t get_bytes_usage() const;
+
+    MutableBlockPtr get_partial_agged_block() {
+        return _aggregated_block;
+    }
+
+    void reset_aggregator() {
+        _aggregated_block.reset();
+        _agg_data_counters.clear();
+        _cumulative_agg_num = 0;
+        _is_first_append = true;
+    }
+
+private:
+    bool _is_first_append = true;
+    size_t _key_cols_num;
+    size_t _value_cols_num;
+    size_t _cumulative_agg_num = 0;
+    size_t _cols_num;
+    const Schema* _schema;
+    const TabletSchema* _tablet_schema;
+    bool _src_sorted;
+    MutableBlockPtr _aggregated_block;
+    std::vector<int> _agg_data_counters;
+    std::vector<AggregateFunctionPtr> _agg_functions;
+
+    std::vector<AggregateDataPtr> _agg_places;
+    
+};
+
+} // namespace
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index aa482fcfbf..8efc0e0e1b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -897,6 +897,14 @@ Block MutableBlock::to_block(int start_column, int end_column) {
     return {columns_with_schema};
 }
 
+void MutableBlock::clear_column_data() noexcept {
+    for (auto& col : _columns) {
+        if (col) {
+            col->clear();
+        }
+    }
+}
+
 std::string MutableBlock::dump_data(size_t row_limit) const {
     std::vector<std::string> headers;
     std::vector<size_t> headers_size;
@@ -956,6 +964,18 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const {
     return temp_block;
 }
 
+//TODO(weixiang): unique_ptr?
+std::shared_ptr<MutableBlock> MutableBlock::create_same_struct_block(size_t size) const {
+    Block temp_block;
+    for (const auto& d : _data_types) {
+        auto column = d->create_column();
+        column->resize(size);
+        temp_block.insert({std::move(column), d, ""});
+    }
+    auto result = std::make_shared<MutableBlock>(std::move(temp_block));
+    return result;
+}
+
 void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) {
     for (auto idx : char_type_idx) {
         if (idx < data.size()) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 729f531291..7a6f61deb9 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -352,12 +352,25 @@ public:
     size_t rows() const;
     size_t columns() const { return _columns.size(); }
 
+
+    std::shared_ptr<MutableBlock> create_same_struct_block(size_t size) const;
+
+    void clear_column_data() noexcept;
+
     bool empty() const { return rows() == 0; }
 
     MutableColumns& mutable_columns() { return _columns; }
 
     void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); }
 
+    void append_from_columns(MutableColumns& columns, size_t length) {
+        DCHECK(_columns.size() == columns.size());
+        for (size_t i = 0; i < _columns.size(); i++) {
+            DCHECK(columns[i]->size() >= length);
+            _columns[i]->insert_range_from(*columns[i], 0, length);
+        }
+    }
+
     DataTypes& data_types() { return _data_types; }
 
     MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org