You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by we...@apache.org on 2022/06/21 12:55:34 UTC
[doris] 01/05: [feature-wip](stream-load-vec) opt memtable
This is an automated email from the ASF dual-hosted git repository.
weixiang pushed a commit to branch memtable_opt_rebase_bak
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d3880b41586fe8f159a3c8ab8a395f65cfe0ef7
Author: weixiang <we...@meituan.com>
AuthorDate: Mon Apr 18 21:37:34 2022 +0800
[feature-wip](stream-load-vec) opt memtable
---
be/src/olap/delta_writer.cpp | 7 +-
be/src/olap/memtable.cpp | 147 +++++++++++++++--
be/src/olap/memtable.h | 43 ++++-
be/src/vec/CMakeLists.txt | 1 +
.../vec/aggregate_functions/block_aggregator.cpp | 180 +++++++++++++++++++++
be/src/vec/aggregate_functions/block_aggregator.h | 70 ++++++++
be/src/vec/core/block.cpp | 20 +++
be/src/vec/core/block.h | 13 ++
8 files changed, 461 insertions(+), 20 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 11ee242449..5cf0c9cd22 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -191,11 +191,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
int start = 0, end = 0;
+ bool flush = false;
const size_t num_rows = row_idxs.size();
for (; start < num_rows;) {
auto count = end + 1 - start;
if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) {
- _mem_table->insert(block, row_idxs[start], count);
+ if (_mem_table->insert(block, row_idxs[start], count)) {
+ flush = true;
+ }
start += count;
end = start;
} else {
@@ -203,7 +206,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
}
- if (_mem_table->memory_usage() >= config::write_buffer_size) {
+ if (flush || _mem_table->is_full()) {
RETURN_NOT_OK(_flush_memtable_async());
_reset_mem_table();
}
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c7f94ead4f..e1c97f2d4d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -53,6 +53,8 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
// TODO: Support ZOrderComparator in the future
_vec_skip_list = std::make_unique<VecTable>(
_vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
+ _block_aggregator =
+ std::make_unique<vectorized::BlockAggregator>(_schema, _tablet_schema, true);
} else {
_vec_skip_list = nullptr;
if (_keys_type == KeysType::DUP_KEYS) {
@@ -114,27 +116,135 @@ int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
*_pblock, -1);
}
-void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
+bool MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) {
if (_is_first_insertion) {
_is_first_insertion = false;
auto cloneBlock = block->clone_without_columns();
- _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
- _vec_row_comparator->set_block(&_input_mutable_block);
- _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
+ _block = std::make_shared<vectorized::MutableBlock>(&cloneBlock);
if (_keys_type != KeysType::DUP_KEYS) {
_init_agg_functions(block);
}
}
- size_t cursor_in_mutableblock = _input_mutable_block.rows();
- size_t oldsize = _input_mutable_block.allocated_bytes();
- _input_mutable_block.add_rows(block, row_pos, num_rows);
- size_t newsize = _input_mutable_block.allocated_bytes();
- _mem_usage += newsize - oldsize;
- _mem_tracker->consume(newsize - oldsize);
-
- for (int i = 0; i < num_rows; i++) {
- _row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
- _insert_one_row_from_block(_row_in_blocks.back());
+ _block->add_rows(block, row_pos, num_rows);
+ _block_bytes_usage += block->allocated_bytes() * num_rows / block->rows();
+ // Memtalbe is full, do not flush immediately
+ // First try to merge these blocks
+ // If the merged memtable is still full or we can not benefit a lot from merge at first
+ // Then flush the memtable into disk.
+ bool is_flush = false;
+ if (is_full()) {
+ size_t before_merge_bytes = bytes_allocated();
+ _merge();
+ size_t after_merged_bytes = bytes_allocated();
+ // TODO(weixiang): magic number here, make it configurable later.
+ if (is_full() || (after_merged_bytes >= before_merge_bytes * 2 / 3 && _merge_count == 1)) {
+ is_flush = true;
+ }
+ }
+ return is_flush;
+}
+
+size_t MemTable::bytes_allocated() const {
+ return _block_bytes_usage + _block_aggregator->get_bytes_usage();
+}
+
+bool MemTable::is_full() const {
+ return bytes_allocated() > config::write_buffer_size;
+}
+
+void MemTable::_merge() {
+ if (_block == nullptr || _keys_type == KeysType::DUP_KEYS) {
+ return;
+ }
+ _sort(false);
+ _agg(false);
+ _merge_count++;
+}
+
+void MemTable::_agg(const bool finalize) {
+ // note that the _block had been sorted before.
+ if (_sorted_block == nullptr || _sorted_block->rows() <= 0) {
+ return;
+ }
+ vectorized::Block sorted_block = _sorted_block->to_block();
+ _block_aggregator->append_block(&sorted_block);
+ _block_aggregator->partial_sort_merged_aggregate();
+ if (finalize) {
+ _sorted_block.reset();
+ } else {
+ _sorted_block->clear_column_data();
+ }
+}
+
+void MemTable::_sort(const bool finalize) {
+ _index_for_sort.resize(_block->rows());
+ for (uint32_t i = 0; i < _block->rows(); i++) {
+ _index_for_sort[i] = {i, i};
+ }
+
+ _sort_block_by_rows();
+ _sorted_block = _block->create_same_struct_block(0);
+ _append_sorted_block(_block.get(), _sorted_block.get());
+ if (finalize) {
+ _block.reset();
+ } else {
+ _block->clear_column_data();
+ }
+ _block_bytes_usage = 0;
+}
+
+void MemTable::_sort_block_by_rows() {
+ std::sort(_index_for_sort.begin(), _index_for_sort.end(),
+ [this](const MemTable::OrderedIndexItem& left,
+ const MemTable::OrderedIndexItem& right) {
+ int res = _block->compare_at(left.index_in_block, right.index_in_block,
+ _schema->num_key_columns(), *_block.get(), -1);
+ if (res != 0) {
+ return res < 0;
+ }
+ return left.incoming_index < right.incoming_index;
+ });
+}
+
+void MemTable::_append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst) {
+ size_t row_num = src->rows();
+ _sorted_index_in_block.clear();
+ _sorted_index_in_block.reserve(row_num);
+ for (size_t i = 0; i < row_num; i++) {
+ _sorted_index_in_block.push_back(_index_for_sort[i].index_in_block);
+ }
+ vectorized::Block src_block = src->to_block();
+ dst->add_rows(&src_block, _sorted_index_in_block.data(),
+ _sorted_index_in_block.data() + row_num);
+}
+
+void MemTable::finalize() {
+ //TODO(weixiang): check here
+ if (_block == nullptr) {
+ return;
+ }
+
+ if (_keys_type != KeysType::DUP_KEYS) {
+ // agg mode
+ if (_block->rows() > 0) {
+ _merge();
+ }
+ if (_merge_count > 1) {
+ _block = _block_aggregator->get_partial_agged_block();
+ _block_aggregator->reset_aggregator();
+ _sort(true);
+ _agg(true);
+ } else {
+ _block.reset();
+ _sorted_block.reset();
+ }
+
+ _block_bytes_usage = 0;
+ _sorted_block = _block_aggregator->get_partial_agged_block();
+
+ } else {
+ // dup mode
+ _sort(true);
}
}
@@ -271,6 +381,13 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
}
Status MemTable::flush() {
+ if (!_skip_list) {
+ finalize();
+ if (_sorted_block == nullptr) {
+ return Status::OK();
+ }
+ }
+
VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id
<< ", memsize: " << memory_usage() << ", rows: " << _rows;
int64_t duration_ns = 0;
@@ -301,7 +418,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
RETURN_NOT_OK(st);
}
} else {
- vectorized::Block block = _collect_vskiplist_results();
+ vectorized::Block block = _sorted_block->to_block();
// beta rowset flush parallel, segment write add block is not
// thread safe, so use tmp variable segment_write instead of
// member variable
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 910cd92270..daca1f71fc 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -27,6 +27,7 @@
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/common/string_ref.h"
#include "vec/core/block.h"
+#include "vec/aggregate_functions/block_aggregator.h"
namespace doris {
@@ -48,11 +49,17 @@ public:
int64_t tablet_id() const { return _tablet_id; }
size_t memory_usage() const { return _mem_tracker->consumption(); }
+
std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
- inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
- // insert tuple from (row_pos) to (row_pos+num_rows)
- void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+ inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); };
+ //insert tuple from (row_pos) to (row_pos+num_rows)
+ bool insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+
+ bool is_full() const;
+ size_t bytes_allocated() const;
+
+ void finalize();
/// Flush
Status flush();
@@ -146,6 +153,14 @@ private:
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
+ void _sort(const bool finalize);
+ void _sort_block_by_rows();
+
+ void _merge();
+
+ void _agg(const bool finalize);
+
+ void _append_sorted_block(vectorized::MutableBlock* src, vectorized::MutableBlock* dst);
int64_t _tablet_id;
Schema* _schema;
@@ -195,13 +210,35 @@ private:
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
+
+ struct OrderedIndexItem {
+ uint32_t index_in_block;
+ uint32_t incoming_index; // used for sort by column
+ };
+
+ using OrderedIndex = std::vector<OrderedIndexItem>;
+
+ OrderedIndex _index_for_sort;
+
+ std::vector<int> _sorted_index_in_block;
+
+ vectorized::MutableBlockPtr _block;
+
+ vectorized::MutableBlockPtr _sorted_block;
+
+ std::unique_ptr<vectorized::BlockAggregator> _block_aggregator;
+
vectorized::Block _collect_vskiplist_results();
+
bool _is_first_insertion;
void _init_agg_functions(const vectorized::Block* block);
std::vector<vectorized::AggregateFunctionPtr> _agg_functions;
std::vector<RowInBlock*> _row_in_blocks;
size_t _mem_usage;
+ size_t _block_bytes_usage = 0;
+ size_t _agg_bytes_usage = 0;
+ int _merge_count = 0;
}; // class MemTable
inline std::ostream& operator<<(std::ostream& os, const MemTable& table) {
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 265d6fd884..ca768bbe3d 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -39,6 +39,7 @@ set(VEC_FILES
aggregate_functions/aggregate_function_group_concat.cpp
aggregate_functions/aggregate_function_percentile_approx.cpp
aggregate_functions/aggregate_function_simple_factory.cpp
+ aggregate_functions/block_aggregator.cpp
columns/collator.cpp
columns/column.cpp
columns/column_array.cpp
diff --git a/be/src/vec/aggregate_functions/block_aggregator.cpp b/be/src/vec/aggregate_functions/block_aggregator.cpp
new file mode 100644
index 0000000000..201cca75e0
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.cpp
@@ -0,0 +1,180 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "block_aggregator.h"
+
+namespace doris::vectorized {
+
+BlockAggregator::BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted)
+ : _schema(schema), _tablet_schema(tablet_schema), _src_sorted(src_sorted) {
+ _init_agg_functions();
+}
+
+BlockAggregator::~BlockAggregator() {
+}
+
+void BlockAggregator::_init_agg_functions() {
+ _cols_num = _schema->num_columns();
+ _key_cols_num = _schema->num_key_columns();
+ _value_cols_num = _cols_num - _key_cols_num;
+ //TODO(weixiang): save memory just use value length.
+ _agg_functions.resize(_schema->num_columns());
+ _agg_places.resize(_value_cols_num);
+ for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
+ FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation();
+ std::string agg_name = TabletColumn::get_string_by_aggregation_type(agg_method) + AGG_LOAD_SUFFIX;
+
+ std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(),
+ [](unsigned char c) { return std::tolower(c); });
+
+ // create aggregate function
+ DataTypes argument_types;
+ // TODO(weixiang): 检查这块这么写是否有隐患
+ DataTypePtr dtptr = Schema::get_data_type_ptr(*_schema->column(cid));
+ argument_types.push_back(dtptr);
+ Array params;
+ AggregateFunctionPtr function =
+ AggregateFunctionSimpleFactory::instance().get(
+ agg_name, argument_types, params, dtptr->is_nullable());
+
+ DCHECK(function != nullptr);
+ _agg_functions[cid] = function;
+ }
+}
+
+
+void BlockAggregator::append_block(Block* block) {
+ if (block == nullptr || block->rows() <= 0){
+ return;
+ }
+ _agg_data_counters.reserve(_agg_data_counters.size() + block->rows());
+ size_t key_num = _schema->num_key_columns();
+
+ size_t same_rows = 1;
+ for (size_t i = 0; i < block->rows(); i++) {
+ if ( i+1 == block->rows() || block->compare_at(i, i+1, key_num, *block, -1) != 0) {
+ _agg_data_counters.push_back(same_rows);
+ same_rows = 0;
+ }
+ same_rows++;
+ }
+ if (_is_first_append) {
+ // this means it is appending block for the first time
+ _aggregated_block = std::make_shared<MutableBlock>(block);
+ _is_first_append = false;
+ } else {
+ _aggregated_block->add_rows(block, 0, block->rows());
+ }
+}
+
+/**
+ * @brief aggregate sorted block
+ * 1. _agg_data_counters save the following N rows to agg in partial sort block
+ * 2. first_row_idx records the first row num of rows with the same keys.
+ *
+ *
+ * TODO(weixiang):
+ * 1. refactor function partial_sort_merged_aggregate, 拆成多个函数:init等
+ */
+
+void BlockAggregator::partial_sort_merged_aggregate() {
+ DCHECK(!_agg_data_counters.empty());
+ std::vector<int> first_row_idx; // TODO(weixiang): add into member variables
+ std::vector<MutableColumnPtr> aggregated_cols;
+ first_row_idx.reserve(_agg_data_counters.size());
+ int row_pos = _cumulative_agg_num;
+ for (size_t i = 0; i < _agg_data_counters.size(); i++) {
+ first_row_idx.push_back(row_pos);
+ row_pos += _agg_data_counters[i];
+ }
+ auto col_ids = _schema->column_ids();
+ size_t agged_row_num = first_row_idx.size();
+ // for keys:
+ for (size_t cid = 0; cid < _key_cols_num; cid++) {
+
+ MutableColumnPtr key_col =
+ _schema->get_data_type_ptr(*_schema->column(col_ids[cid]))->create_column();
+ key_col->insert_indices_from(*_aggregated_block->mutable_columns()[cid],
+ first_row_idx.data(),
+ first_row_idx.data() + agged_row_num);
+ aggregated_cols.emplace_back(std::move(key_col));
+ }
+
+ // init agged place for values:
+ for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+ size_t place_size = _agg_functions[cid]->size_of_data();
+ _agg_places[cid - _key_cols_num] = new char[place_size * agged_row_num];
+ for (auto i = 0; i < agged_row_num; i++) {
+ AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+ _agg_functions[cid]->create(place);
+ }
+
+ }
+
+ // do agg
+ for (size_t cid = _key_cols_num; cid < _cols_num; cid++) {
+ size_t place_size = _agg_functions[cid]->size_of_data();
+ auto* src_value_col_ptr = _aggregated_block->mutable_columns()[cid].get();
+ size_t agg_begin_idx = 0;
+
+ for (size_t i = 0; i < agged_row_num; i++) {
+ AggregateDataPtr place = _agg_places[cid - _key_cols_num] + place_size * i;
+ _agg_functions[cid]->add_batch_range(
+ agg_begin_idx,
+ agg_begin_idx + _agg_data_counters[i] - 1, place,
+ const_cast<const doris::vectorized::IColumn**>(&src_value_col_ptr), nullptr);
+ agg_begin_idx += _agg_data_counters[i];
+ }
+ }
+
+ // move to result column
+ for (size_t value_col_idx = 0; value_col_idx < _value_cols_num; value_col_idx++) {
+ size_t place_size = _agg_functions[value_col_idx + _key_cols_num]->size_of_data();
+ MutableColumnPtr dst_value_col_ptr =
+ _schema->get_data_type_ptr(*_schema->column(col_ids[value_col_idx + _key_cols_num]))
+ ->create_column();
+ for (size_t i = 0; i < first_row_idx.size(); i++) {
+ _agg_functions[value_col_idx + _key_cols_num]->insert_result_into(
+ _agg_places[value_col_idx] + i * place_size,
+ *reinterpret_cast<doris::vectorized::IColumn*>(dst_value_col_ptr.get()));
+ }
+ aggregated_cols.emplace_back(std::move(dst_value_col_ptr));
+ }
+
+ _aggregated_block->clear_column_data();
+ _aggregated_block->append_from_columns(aggregated_cols, agged_row_num);
+ _agg_data_counters.clear();
+ _cumulative_agg_num += agged_row_num;
+
+ for(auto place : _agg_places) {
+ // free aggregated memory
+ delete[] place;
+ }
+
+
+}
+
+
+
+size_t BlockAggregator::get_bytes_usage() const{
+ if(UNLIKELY(_aggregated_block == nullptr)) {
+ return 0;
+ }
+ return _aggregated_block->allocated_bytes();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/block_aggregator.h b/be/src/vec/aggregate_functions/block_aggregator.h
new file mode 100644
index 0000000000..510e614dd2
--- /dev/null
+++ b/be/src/vec/aggregate_functions/block_aggregator.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#pragma once
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/core/block.h"
+#include "olap/schema.h"
+
+namespace doris::vectorized {
+
+using BlockPtr = std::shared_ptr<Block>;
+using MutableBlockPtr = std::shared_ptr<MutableBlock>;
+
+class BlockAggregator {
+
+
+public:
+ BlockAggregator(const Schema* schema, const TabletSchema* tablet_schema, bool src_sorted);
+ ~BlockAggregator();
+ void append_block(Block* block);
+ void partial_sort_merged_aggregate();
+ void _init_agg_functions();
+ size_t get_bytes_usage() const;
+
+ MutableBlockPtr get_partial_agged_block() {
+ return _aggregated_block;
+ }
+
+ void reset_aggregator() {
+ _aggregated_block.reset();
+ _agg_data_counters.clear();
+ _cumulative_agg_num = 0;
+ _is_first_append = true;
+ }
+
+private:
+ bool _is_first_append = true;
+ size_t _key_cols_num;
+ size_t _value_cols_num;
+ size_t _cumulative_agg_num = 0;
+ size_t _cols_num;
+ const Schema* _schema;
+ const TabletSchema* _tablet_schema;
+ bool _src_sorted;
+ MutableBlockPtr _aggregated_block;
+ std::vector<int> _agg_data_counters;
+ std::vector<AggregateFunctionPtr> _agg_functions;
+
+ std::vector<AggregateDataPtr> _agg_places;
+
+};
+
+} // namespace
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index aa482fcfbf..8efc0e0e1b 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -897,6 +897,14 @@ Block MutableBlock::to_block(int start_column, int end_column) {
return {columns_with_schema};
}
+void MutableBlock::clear_column_data() noexcept {
+ for (auto& col : _columns) {
+ if (col) {
+ col->clear();
+ }
+ }
+}
+
std::string MutableBlock::dump_data(size_t row_limit) const {
std::vector<std::string> headers;
std::vector<size_t> headers_size;
@@ -956,6 +964,18 @@ std::unique_ptr<Block> Block::create_same_struct_block(size_t size) const {
return temp_block;
}
+//TODO(weixiang): unique_ptr?
+std::shared_ptr<MutableBlock> MutableBlock::create_same_struct_block(size_t size) const {
+ Block temp_block;
+ for (const auto& d : _data_types) {
+ auto column = d->create_column();
+ column->resize(size);
+ temp_block.insert({std::move(column), d, ""});
+ }
+ auto result = std::make_shared<MutableBlock>(std::move(temp_block));
+ return result;
+}
+
void Block::shrink_char_type_column_suffix_zero(const std::vector<size_t>& char_type_idx) {
for (auto idx : char_type_idx) {
if (idx < data.size()) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 729f531291..7a6f61deb9 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -352,12 +352,25 @@ public:
size_t rows() const;
size_t columns() const { return _columns.size(); }
+
+ std::shared_ptr<MutableBlock> create_same_struct_block(size_t size) const;
+
+ void clear_column_data() noexcept;
+
bool empty() const { return rows() == 0; }
MutableColumns& mutable_columns() { return _columns; }
void set_muatable_columns(MutableColumns&& columns) { _columns = std::move(columns); }
+ void append_from_columns(MutableColumns& columns, size_t length) {
+ DCHECK(_columns.size() == columns.size());
+ for (size_t i = 0; i < _columns.size(); i++) {
+ DCHECK(columns[i]->size() >= length);
+ _columns[i]->insert_range_from(*columns[i], 0, length);
+ }
+ }
+
DataTypes& data_types() { return _data_types; }
MutableColumnPtr& get_column_by_position(size_t position) { return _columns[position]; }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org