You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/29 07:07:28 UTC
[doris] branch master updated: [Feature] [Vectorized] support vectorized schema-change (#10187)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8e713ddfcf [Feature] [Vectorized] support vectorized schema-change (#10187)
8e713ddfcf is described below
commit 8e713ddfcfe0b2dba6a442c6165ebccd786dc162
Author: Pxl <95...@qq.com>
AuthorDate: Wed Jun 29 15:07:21 2022 +0800
[Feature] [Vectorized] support vectorized schema-change (#10187)
---
be/src/common/config.h | 2 +-
be/src/olap/column_mapping.h | 5 +-
be/src/olap/rowset/segment_v2/column_reader.cpp | 52 ++-
be/src/olap/rowset/segment_v2/column_reader.h | 7 +-
be/src/olap/schema_change.cpp | 438 +++++++++++++++++++++++-
be/src/olap/schema_change.h | 56 ++-
6 files changed, 523 insertions(+), 37 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 96ff2f7ea6..8d655185c2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -245,7 +245,7 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
CONF_mBool(disable_auto_compaction, "false");
// whether enable vectorized compaction
CONF_Bool(enable_vectorized_compaction, "true");
-// whether enable vectorized schema change
+// whether enable vectorized schema change, material-view or rollup task will fail if this config open.
CONF_Bool(enable_vectorized_alter_table, "false");
// check the configuration of auto compaction in seconds when auto compaction disabled
diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h
index de82705a91..a3ad7721d1 100644
--- a/be/src/olap/column_mapping.h
+++ b/be/src/olap/column_mapping.h
@@ -20,6 +20,8 @@
#include <gen_cpp/Exprs_types.h>
#include <memory>
+
+#include "olap/tablet_schema.h"
namespace doris {
class WrapperField;
@@ -36,8 +38,9 @@ struct ColumnMapping {
// materialize view transform function used in schema change
std::string materialized_function;
std::shared_ptr<TExpr> expr;
+ const TabletColumn* new_column;
};
using SchemaMapping = std::vector<ColumnMapping>;
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index d790c8aee3..7243736281 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -148,7 +148,7 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
- BlockCompressionCodec* codec) {
+ BlockCompressionCodec* codec) const {
iter_opts.sanity_check();
PageReadOptions opts;
opts.rblock = iter_opts.rblock;
@@ -847,74 +847,70 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, b
return Status::OK();
}
-void DefaultValueColumnIterator::insert_default_data(vectorized::MutableColumnPtr& dst, size_t n) {
+void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info, size_t type_size,
+ void* mem_value,
+ vectorized::MutableColumnPtr& dst, size_t n) {
vectorized::Int128 int128;
char* data_ptr = (char*)&int128;
size_t data_len = sizeof(int128);
- auto insert_column_data = [&]() {
- for (size_t i = 0; i < n; ++i) {
- dst->insert_data(data_ptr, data_len);
- }
- };
-
- switch (_type_info->type()) {
+ switch (type_info->type()) {
case OLAP_FIELD_TYPE_OBJECT:
case OLAP_FIELD_TYPE_HLL: {
dst->insert_many_defaults(n);
break;
}
case OLAP_FIELD_TYPE_DATE: {
- assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
- std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(_mem_value);
+ assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
+ std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(mem_value);
vectorized::VecDateTimeValue value;
value.from_date_str(str.c_str(), str.length());
value.cast_to_date();
//TODO: here is int128 = int64, here rely on the logic of little endian
int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
- insert_column_data();
+ dst->insert_many_data(data_ptr, data_len, n);
break;
}
case OLAP_FIELD_TYPE_DATETIME: {
- assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
- std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(_mem_value);
+ assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
+ std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(mem_value);
vectorized::VecDateTimeValue value;
value.from_date_str(str.c_str(), str.length());
value.to_datetime();
int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
- insert_column_data();
+ dst->insert_many_data(data_ptr, data_len, n);
break;
}
case OLAP_FIELD_TYPE_DATEV2: {
- assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
+ assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
- int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)_mem_value);
- insert_column_data();
+ int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)mem_value);
+ dst->insert_many_data(data_ptr, data_len, n);
break;
}
case OLAP_FIELD_TYPE_DECIMAL: {
- assert(_type_size ==
+ assert(type_size ==
sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DECIMAL>::CppType)); //decimal12_t
- decimal12_t* d = (decimal12_t*)_mem_value;
+ decimal12_t* d = (decimal12_t*)mem_value;
int128 = DecimalV2Value(d->integer, d->fraction).value();
- insert_column_data();
+ dst->insert_many_data(data_ptr, data_len, n);
break;
}
case OLAP_FIELD_TYPE_STRING:
case OLAP_FIELD_TYPE_VARCHAR:
case OLAP_FIELD_TYPE_CHAR: {
- data_ptr = ((Slice*)_mem_value)->data;
- data_len = ((Slice*)_mem_value)->size;
- insert_column_data();
+ data_ptr = ((Slice*)mem_value)->data;
+ data_len = ((Slice*)mem_value)->size;
+ dst->insert_many_data(data_ptr, data_len, n);
break;
}
default: {
- data_ptr = (char*)_mem_value;
- data_len = _type_size;
- insert_column_data();
+ data_ptr = (char*)mem_value;
+ data_len = type_size;
+ dst->insert_many_data(data_ptr, data_len, n);
}
}
}
@@ -926,7 +922,7 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, vectorized::MutableColu
dst->insert_many_defaults(*n);
} else {
*has_null = false;
- insert_default_data(dst, *n);
+ insert_default_data(_type_info.get(), _type_size, _mem_value, dst, *n);
}
return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index b82dbabb01..eb1536d69c 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -106,7 +106,7 @@ public:
// read a page from file into a page handle
Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
PageHandle* handle, Slice* page_body, PageFooterPB* footer,
- BlockCompressionCodec* codec);
+ BlockCompressionCodec* codec) const;
bool is_nullable() const { return _meta.is_nullable(); }
@@ -449,9 +449,10 @@ public:
ordinal_t get_current_ordinal() const override { return _current_rowid; }
-private:
- void insert_default_data(vectorized::MutableColumnPtr& dst, size_t n);
+ static void insert_default_data(const TypeInfo* type_info, size_t type_size, void* mem_value,
+ vectorized::MutableColumnPtr& dst, size_t n);
+private:
bool _has_default_value;
std::string _default_value;
bool _is_nullable;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 94f20da9d5..83b48c0d5c 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -26,9 +26,10 @@
#include "olap/row.h"
#include "olap/row_block.h"
#include "olap/row_cursor.h"
-#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/segment_v2/column_reader.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
+#include "olap/types.h"
#include "olap/wrapper_field.h"
#include "runtime/mem_tracker.h"
#include "util/defer_op.h"
@@ -91,6 +92,154 @@ private:
std::priority_queue<MergeElement> _heap;
};
+class MultiBlockMerger {
+public:
+ MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+ Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+ RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+ int rows = 0;
+ for (auto& block : blocks) {
+ rows += block->rows();
+ }
+ if (!rows) {
+ return Status::OK();
+ }
+
+ std::vector<RowRef> row_refs;
+ row_refs.reserve(rows);
+ for (auto& block : blocks) {
+ for (uint16_t i = 0; i < block->rows(); i++) {
+ row_refs.emplace_back(block.get(), i);
+ }
+ }
+ // TODO: try to use pdqsort to replace std::sort
+ // The block version is incremental.
+ std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
+
+ auto finalized_block = _tablet->tablet_schema().create_block();
+ int columns = finalized_block.columns();
+ *merged_rows += rows;
+
+ if (_tablet->keys_type() == KeysType::AGG_KEYS) {
+ auto tablet_schema = _tablet->tablet_schema();
+ int key_number = _tablet->num_key_columns();
+
+ std::vector<vectorized::AggregateFunctionPtr> agg_functions;
+ std::vector<vectorized::AggregateDataPtr> agg_places;
+
+ for (int i = key_number; i < columns; i++) {
+ vectorized::AggregateFunctionPtr function =
+ tablet_schema.column(i).get_aggregate_function(
+ {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
+ agg_functions.push_back(function);
+ // create aggregate data
+ vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+ function->create(place);
+ agg_places.push_back(place);
+ }
+
+ for (int i = 0; i < rows; i++) {
+ auto row_ref = row_refs[i];
+
+ for (int j = key_number; j < columns; j++) {
+ auto column_ptr = row_ref.get_column(j).get();
+ agg_functions[j - key_number]->add(
+ agg_places[j - key_number],
+ const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
+ nullptr);
+ }
+
+ if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+ for (int j = 0; j < key_number; j++) {
+ finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
+ *row_ref.get_column(j), row_ref.position);
+ }
+
+ for (int j = key_number; j < columns; j++) {
+ agg_functions[j - key_number]->insert_result_into(
+ agg_places[j - key_number],
+ finalized_block.get_by_position(j).column->assume_mutable_ref());
+ agg_functions[j - key_number]->create(agg_places[j - key_number]);
+ }
+
+ if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
+ *merged_rows -= finalized_block.rows();
+ rowset_writer->add_block(&finalized_block);
+ finalized_block.clear_column_data();
+ }
+ }
+ }
+
+ for (int i = 0; i < columns - key_number; i++) {
+ agg_functions[i]->destroy(agg_places[i]);
+ delete[] agg_places[i];
+ }
+ } else {
+ std::vector<RowRef> pushed_row_refs;
+ if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+ std::swap(pushed_row_refs, row_refs);
+ } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+ for (int i = 0; i < rows; i++) {
+ if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+ pushed_row_refs.push_back(row_refs[i]);
+ }
+ }
+ }
+
+ // update real inserted row number
+ rows = pushed_row_refs.size();
+ *merged_rows -= rows;
+
+ for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
+ int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
+
+ for (int idx = 0; idx < columns; idx++) {
+ auto column = finalized_block.get_by_position(idx).column->assume_mutable();
+
+ for (int j = 0; j < limit; j++) {
+ auto row_ref = pushed_row_refs[i + j];
+ column->insert_from(*row_ref.get_column(idx), row_ref.position);
+ }
+ }
+ rowset_writer->add_block(&finalized_block);
+ finalized_block.clear_column_data();
+ }
+ }
+
+ RETURN_IF_ERROR(rowset_writer->flush());
+ return Status::OK();
+ }
+
+private:
+ struct RowRef {
+ RowRef(vectorized::Block* block_, uint16_t position_)
+ : block(block_), position(position_) {}
+ vectorized::ColumnPtr get_column(int index) const {
+ return block->get_by_position(index).column;
+ }
+ const vectorized::Block* block;
+ uint16_t position;
+ };
+
+ struct RowRefComparator {
+ RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {}
+
+ int compare(const RowRef& lhs, const RowRef& rhs) const {
+ return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
+ }
+
+ bool operator()(const RowRef& lhs, const RowRef& rhs) const {
+ return compare(lhs, rhs) < 0;
+ }
+
+ const size_t _num_columns;
+ };
+
+ TabletSharedPtr _tablet;
+ RowRefComparator _cmp;
+};
+
RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl)
: _desc_tbl(desc_tbl) {
_schema_mapping.resize(tablet_schema.num_columns());
@@ -665,6 +814,128 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
#undef TYPE_REINTERPRET_CAST
#undef ASSIGN_DEFAULT_VALUE
+Status RowBlockChanger::change_block(vectorized::Block* ref_block,
+ vectorized::Block* new_block) const {
+ if (new_block->columns() != _schema_mapping.size()) {
+ LOG(WARNING) << "block does not match with schema mapping rules. "
+ << "block_schema_size=" << new_block->columns()
+ << ", mapping_schema_size=" << _schema_mapping.size();
+ return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
+ }
+
+ // material-view or rollup task will fail now
+ if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
+ return Status::NotSupported(
+ "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
+ "not supported now. ");
+ }
+
+ std::vector<bool> nullable_tuples;
+ for (int i = 0; i < ref_block->columns(); i++) {
+ nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
+ }
+
+ ObjectPool pool;
+ RuntimeState* state = pool.add(new RuntimeState());
+ state->set_desc_tbl(&_desc_tbl);
+ RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
+
+ const int row_size = ref_block->rows();
+ const int column_size = new_block->columns();
+
+ // swap ref_block[key] and new_block[value]
+ std::map<int, int> swap_idx_map;
+
+ for (int idx = 0; idx < column_size; idx++) {
+ int ref_idx = _schema_mapping[idx].ref_column;
+
+ if (!_schema_mapping[idx].materialized_function.empty()) {
+ return Status::NotSupported("Materialized function not supported now. ");
+ }
+
+ if (ref_idx < 0) {
+ // new column, write default value
+ auto value = _schema_mapping[idx].default_value;
+ auto column = new_block->get_by_position(idx).column->assume_mutable();
+ if (value->is_null()) {
+ DCHECK(column->is_nullable());
+ column->insert_many_defaults(row_size);
+ } else {
+ auto type_info = get_type_info(_schema_mapping[idx].new_column);
+ DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
+ value->ptr(), column, row_size);
+ }
+ } else if (_schema_mapping[idx].expr != nullptr) {
+ // calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr
+ vectorized::VExprContext* ctx = nullptr;
+ RETURN_IF_ERROR(
+ vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx));
+
+ RETURN_IF_ERROR(ctx->prepare(state, row_desc));
+ RETURN_IF_ERROR(ctx->open(state));
+
+ int result_column_id = -1;
+ RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
+ DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
+ << new_block->get_by_position(idx).name << " size invalid"
+ << ", expect=" << row_size
+ << ", real=" << ref_block->get_by_position(result_column_id).column->size();
+
+ if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) {
+ RETURN_IF_ERROR(
+ _check_cast_valid(ref_block->get_by_position(ref_idx).column,
+ ref_block->get_by_position(result_column_id).column));
+ }
+ swap_idx_map[result_column_id] = idx;
+
+ ctx->close(state);
+ } else {
+ // same type, just swap column
+ swap_idx_map[ref_idx] = idx;
+ }
+ }
+
+ for (auto it : swap_idx_map) {
+ new_block->get_by_position(it.second).column.swap(
+ ref_block->get_by_position(it.first).column);
+ }
+
+ return Status::OK();
+}
+
+Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
+ vectorized::ColumnPtr new_column) const {
+ // TODO: rethink this check
+ // This check is to prevent schema-change from causing data loss,
+ // But it is possible to generate null data in material-view or rollup.
+
+ if (ref_column->is_nullable() != new_column->is_nullable()) {
+ return Status::DataQualityError("column.is_nullable() is changed!");
+ }
+
+ if (ref_column->is_nullable()) {
+ auto* ref_null_map =
+ vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
+ ->get_null_map_column()
+ .get_data()
+ .data();
+ auto* new_null_map =
+ vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
+ ->get_null_map_column()
+ .get_data()
+ .data();
+
+ bool is_changed = false;
+ for (size_t i = 0; i < ref_column->size(); i++) {
+ is_changed |= (ref_null_map[i] != new_null_map[i]);
+ }
+ if (is_changed) {
+ return Status::DataQualityError("is_null of data is changed!");
+ }
+ }
+ return Status::OK();
+}
+
RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator)
: _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {}
@@ -1050,6 +1321,34 @@ Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
return res;
}
+Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
+ auto new_block =
+ std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
+ auto ref_block =
+ std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
+
+ int origin_columns_size = ref_block->columns();
+
+ rowset_reader->next_block(ref_block.get());
+ while (ref_block->rows()) {
+ RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
+ RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
+
+ new_block->clear_column_data();
+ ref_block->clear_column_data(origin_columns_size);
+ rowset_reader->next_block(ref_block.get());
+ }
+
+ if (!rowset_writer->flush()) {
+ return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
+ }
+
+ return Status::OK();
+}
+
SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
size_t memory_limitation)
: _row_block_changer(row_block_changer),
@@ -1062,6 +1361,18 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
SAFE_DELETE(_row_block_allocator);
}
+VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
+ size_t memory_limitation)
+ : _changer(row_block_changer),
+ _memory_limitation(memory_limitation),
+ _temp_delta_versions(Version::mock()) {
+ _mem_tracker = MemTracker::create_tracker(
+ config::memory_limitation_per_thread_for_schema_change_bytes,
+ fmt::format("VSchemaChangeWithSorting:changer={}",
+ std::to_string(int64(&row_block_changer))),
+ StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK);
+}
+
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet,
@@ -1219,6 +1530,89 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
return res;
}
+Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet,
+ TabletSharedPtr base_tablet) {
+ // for internal sorting
+ std::vector<std::unique_ptr<vectorized::Block>> blocks;
+
+ // for external sorting
+ // src_rowsets to store the rowset generated by internal sorting
+ std::vector<RowsetSharedPtr> src_rowsets;
+
+ Defer defer {[&]() {
+ // remove the intermediate rowsets generated by internal sorting
+ for (auto& row_set : src_rowsets) {
+ StorageEngine::instance()->add_unused_rowset(row_set);
+ }
+ }};
+
+ RowsetSharedPtr rowset = rowset_reader->rowset();
+ SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
+ _temp_delta_versions.first = _temp_delta_versions.second;
+
+ auto new_block =
+ std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
+ auto ref_block =
+ std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
+
+ int origin_columns_size = ref_block->columns();
+
+ auto create_rowset = [&]() -> Status {
+ if (blocks.empty()) {
+ return Status::OK();
+ }
+
+ RowsetSharedPtr rowset;
+ RETURN_IF_ERROR(_internal_sorting(
+ blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
+ new_tablet, BETA_ROWSET, segments_overlap, &rowset));
+ src_rowsets.push_back(rowset);
+
+ for (auto& block : blocks) {
+ _mem_tracker->release(block->allocated_bytes());
+ }
+ blocks.clear();
+
+ // increase temp version
+ _temp_delta_versions.second++;
+ return Status::OK();
+ };
+
+ rowset_reader->next_block(ref_block.get());
+ while (ref_block->rows()) {
+ RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
+ if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
+ RETURN_IF_ERROR(create_rowset());
+
+ if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
+ LOG(WARNING) << "Memory limitation is too small for Schema Change."
+ << "memory_limitation=" << _memory_limitation;
+ return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
+ }
+ }
+
+ // move unique ptr
+ blocks.push_back(
+ std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block()));
+ swap(blocks.back(), new_block);
+
+ ref_block->clear_column_data(origin_columns_size);
+ rowset_reader->next_block(ref_block.get());
+ }
+
+ RETURN_IF_ERROR(create_rowset());
+
+ if (src_rowsets.empty()) {
+ RETURN_IF_ERROR(rowset_writer->flush());
+ } else {
+ RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
+ }
+
+ return Status::OK();
+}
+
bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
const Version& version, TabletSharedPtr new_tablet,
SegmentsOverlapPB segments_overlap,
@@ -1247,6 +1641,29 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
return true;
}
+Status VSchemaChangeWithSorting::_internal_sorting(
+ const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
+ TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
+ SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
+ uint64_t merged_rows = 0;
+ MultiBlockMerger merger(new_tablet);
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ RETURN_IF_ERROR(
+ new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer));
+
+ Defer defer {[&]() {
+ new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+ rowset_writer->rowset_id().to_string());
+ }};
+
+ RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
+
+ _add_merged_rows(merged_rows);
+ *rowset = rowset_writer->build();
+ return Status::OK();
+}
+
bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
RowsetWriter* rowset_writer,
TabletSharedPtr new_tablet) {
@@ -1275,6 +1692,25 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
return true;
}
+Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
+ RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet) {
+ std::vector<RowsetReaderSharedPtr> rs_readers;
+ for (auto& rowset : src_rowsets) {
+ RowsetReaderSharedPtr rs_reader;
+ RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
+ rs_readers.push_back(rs_reader);
+ }
+
+ Merger::Statistics stats;
+ RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers,
+ rowset_writer, &stats));
+
+ _add_merged_rows(stats.merged_rows);
+ _add_filtered_rows(stats.filtered_rows);
+ return Status::OK();
+}
+
Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
<< ", new_tablet_id=" << request.new_tablet_id
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2f820ae79e..71507fd7fb 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -52,7 +52,12 @@ public:
Status change_row_block(const RowBlock* ref_block, int32_t data_version,
RowBlock* mutable_block, uint64_t* filtered_rows) const;
+ Status change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const;
+
private:
+ Status _check_cast_valid(vectorized::ColumnPtr ref_column,
+ vectorized::ColumnPtr new_column) const;
+
// @brief column-mapping specification of new schema
SchemaMapping _schema_mapping;
@@ -180,6 +185,17 @@ private:
DISALLOW_COPY_AND_ASSIGN(SchemaChangeDirectly);
};
+class VSchemaChangeDirectly : public SchemaChange {
+public:
+ VSchemaChangeDirectly(const RowBlockChanger& row_block_changer) : _changer(row_block_changer) {}
+
+private:
+ Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
+ const RowBlockChanger& _changer;
+};
+
// @breif schema change with sorting
class SchemaChangeWithSorting : public SchemaChange {
public:
@@ -206,6 +222,29 @@ private:
DISALLOW_COPY_AND_ASSIGN(SchemaChangeWithSorting);
};
+class VSchemaChangeWithSorting : public SchemaChange {
+public:
+ VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation);
+ ~VSchemaChangeWithSorting() override = default;
+
+private:
+ Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
+ Status _internal_sorting(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+ const Version& temp_delta_versions, TabletSharedPtr new_tablet,
+ RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
+ RowsetSharedPtr* rowset);
+
+ Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
+ TabletSharedPtr new_tablet);
+
+ const RowBlockChanger& _changer;
+ size_t _memory_limitation;
+ Version _temp_delta_versions;
+ std::shared_ptr<MemTracker> _mem_tracker;
+};
+
class SchemaChangeHandler {
public:
static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
@@ -218,12 +257,23 @@ public:
static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer,
bool sc_sorting, bool sc_directly) {
if (sc_sorting) {
- return std::make_unique<SchemaChangeWithSorting>(
- rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+ if (config::enable_vectorized_alter_table) {
+ return std::make_unique<VSchemaChangeWithSorting>(
+ rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+ } else {
+ return std::make_unique<SchemaChangeWithSorting>(
+ rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+ }
}
+
if (sc_directly) {
- return std::make_unique<SchemaChangeDirectly>(rb_changer);
+ if (config::enable_vectorized_alter_table) {
+ return std::make_unique<VSchemaChangeDirectly>(rb_changer);
+ } else {
+ return std::make_unique<SchemaChangeDirectly>(rb_changer);
+ }
}
+
return std::make_unique<LinkedSchemaChange>(rb_changer);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org