You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/29 07:07:28 UTC

[doris] branch master updated: [Feature] [Vectorized] support vectorized schema-change (#10187)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e713ddfcf [Feature] [Vectorized] support vectorized schema-change (#10187)
8e713ddfcf is described below

commit 8e713ddfcfe0b2dba6a442c6165ebccd786dc162
Author: Pxl <95...@qq.com>
AuthorDate: Wed Jun 29 15:07:21 2022 +0800

    [Feature] [Vectorized] support vectorized schema-change (#10187)
---
 be/src/common/config.h                          |   2 +-
 be/src/olap/column_mapping.h                    |   5 +-
 be/src/olap/rowset/segment_v2/column_reader.cpp |  52 ++-
 be/src/olap/rowset/segment_v2/column_reader.h   |   7 +-
 be/src/olap/schema_change.cpp                   | 438 +++++++++++++++++++++++-
 be/src/olap/schema_change.h                     |  56 ++-
 6 files changed, 523 insertions(+), 37 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 96ff2f7ea6..8d655185c2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -245,7 +245,7 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
 CONF_mBool(disable_auto_compaction, "false");
 // whether enable vectorized compaction
 CONF_Bool(enable_vectorized_compaction, "true");
-// whether enable vectorized schema change
+// whether enable vectorized schema change, material-view or rollup task will fail if this config open.
 CONF_Bool(enable_vectorized_alter_table, "false");
 
 // check the configuration of auto compaction in seconds when auto compaction disabled
diff --git a/be/src/olap/column_mapping.h b/be/src/olap/column_mapping.h
index de82705a91..a3ad7721d1 100644
--- a/be/src/olap/column_mapping.h
+++ b/be/src/olap/column_mapping.h
@@ -20,6 +20,8 @@
 #include <gen_cpp/Exprs_types.h>
 
 #include <memory>
+
+#include "olap/tablet_schema.h"
 namespace doris {
 
 class WrapperField;
@@ -36,8 +38,9 @@ struct ColumnMapping {
     // materialize view transform function used in schema change
     std::string materialized_function;
     std::shared_ptr<TExpr> expr;
+    const TabletColumn* new_column;
 };
 
 using SchemaMapping = std::vector<ColumnMapping>;
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/column_reader.cpp b/be/src/olap/rowset/segment_v2/column_reader.cpp
index d790c8aee3..7243736281 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/column_reader.cpp
@@ -148,7 +148,7 @@ Status ColumnReader::new_bitmap_index_iterator(BitmapIndexIterator** iterator) {
 
 Status ColumnReader::read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
                                PageHandle* handle, Slice* page_body, PageFooterPB* footer,
-                               BlockCompressionCodec* codec) {
+                               BlockCompressionCodec* codec) const {
     iter_opts.sanity_check();
     PageReadOptions opts;
     opts.rblock = iter_opts.rblock;
@@ -847,74 +847,70 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, ColumnBlockView* dst, b
     return Status::OK();
 }
 
-void DefaultValueColumnIterator::insert_default_data(vectorized::MutableColumnPtr& dst, size_t n) {
+void DefaultValueColumnIterator::insert_default_data(const TypeInfo* type_info, size_t type_size,
+                                                     void* mem_value,
+                                                     vectorized::MutableColumnPtr& dst, size_t n) {
     vectorized::Int128 int128;
     char* data_ptr = (char*)&int128;
     size_t data_len = sizeof(int128);
 
-    auto insert_column_data = [&]() {
-        for (size_t i = 0; i < n; ++i) {
-            dst->insert_data(data_ptr, data_len);
-        }
-    };
-
-    switch (_type_info->type()) {
+    switch (type_info->type()) {
     case OLAP_FIELD_TYPE_OBJECT:
     case OLAP_FIELD_TYPE_HLL: {
         dst->insert_many_defaults(n);
         break;
     }
     case OLAP_FIELD_TYPE_DATE: {
-        assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
-        std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(_mem_value);
+        assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::CppType)); //uint24_t
+        std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATE>::to_string(mem_value);
 
         vectorized::VecDateTimeValue value;
         value.from_date_str(str.c_str(), str.length());
         value.cast_to_date();
         //TODO: here is int128 = int64, here rely on the logic of little endian
         int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
-        insert_column_data();
+        dst->insert_many_data(data_ptr, data_len, n);
         break;
     }
     case OLAP_FIELD_TYPE_DATETIME: {
-        assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
-        std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(_mem_value);
+        assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::CppType)); //int64_t
+        std::string str = FieldTypeTraits<OLAP_FIELD_TYPE_DATETIME>::to_string(mem_value);
 
         vectorized::VecDateTimeValue value;
         value.from_date_str(str.c_str(), str.length());
         value.to_datetime();
 
         int128 = binary_cast<vectorized::VecDateTimeValue, vectorized::Int64>(value);
-        insert_column_data();
+        dst->insert_many_data(data_ptr, data_len, n);
         break;
     }
     case OLAP_FIELD_TYPE_DATEV2: {
-        assert(_type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
+        assert(type_size == sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType)); //uint32_t
 
-        int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)_mem_value);
-        insert_column_data();
+        int128 = *((FieldTypeTraits<OLAP_FIELD_TYPE_DATEV2>::CppType*)mem_value);
+        dst->insert_many_data(data_ptr, data_len, n);
         break;
     }
     case OLAP_FIELD_TYPE_DECIMAL: {
-        assert(_type_size ==
+        assert(type_size ==
                sizeof(FieldTypeTraits<OLAP_FIELD_TYPE_DECIMAL>::CppType)); //decimal12_t
-        decimal12_t* d = (decimal12_t*)_mem_value;
+        decimal12_t* d = (decimal12_t*)mem_value;
         int128 = DecimalV2Value(d->integer, d->fraction).value();
-        insert_column_data();
+        dst->insert_many_data(data_ptr, data_len, n);
         break;
     }
     case OLAP_FIELD_TYPE_STRING:
     case OLAP_FIELD_TYPE_VARCHAR:
     case OLAP_FIELD_TYPE_CHAR: {
-        data_ptr = ((Slice*)_mem_value)->data;
-        data_len = ((Slice*)_mem_value)->size;
-        insert_column_data();
+        data_ptr = ((Slice*)mem_value)->data;
+        data_len = ((Slice*)mem_value)->size;
+        dst->insert_many_data(data_ptr, data_len, n);
         break;
     }
     default: {
-        data_ptr = (char*)_mem_value;
-        data_len = _type_size;
-        insert_column_data();
+        data_ptr = (char*)mem_value;
+        data_len = type_size;
+        dst->insert_many_data(data_ptr, data_len, n);
     }
     }
 }
@@ -926,7 +922,7 @@ Status DefaultValueColumnIterator::next_batch(size_t* n, vectorized::MutableColu
         dst->insert_many_defaults(*n);
     } else {
         *has_null = false;
-        insert_default_data(dst, *n);
+        insert_default_data(_type_info.get(), _type_size, _mem_value, dst, *n);
     }
 
     return Status::OK();
diff --git a/be/src/olap/rowset/segment_v2/column_reader.h b/be/src/olap/rowset/segment_v2/column_reader.h
index b82dbabb01..eb1536d69c 100644
--- a/be/src/olap/rowset/segment_v2/column_reader.h
+++ b/be/src/olap/rowset/segment_v2/column_reader.h
@@ -106,7 +106,7 @@ public:
     // read a page from file into a page handle
     Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
                      PageHandle* handle, Slice* page_body, PageFooterPB* footer,
-                     BlockCompressionCodec* codec);
+                     BlockCompressionCodec* codec) const;
 
     bool is_nullable() const { return _meta.is_nullable(); }
 
@@ -449,9 +449,10 @@ public:
 
     ordinal_t get_current_ordinal() const override { return _current_rowid; }
 
-private:
-    void insert_default_data(vectorized::MutableColumnPtr& dst, size_t n);
+    static void insert_default_data(const TypeInfo* type_info, size_t type_size, void* mem_value,
+                                    vectorized::MutableColumnPtr& dst, size_t n);
 
+private:
     bool _has_default_value;
     std::string _default_value;
     bool _is_nullable;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 94f20da9d5..83b48c0d5c 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -26,9 +26,10 @@
 #include "olap/row.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/rowset_id_generator.h"
+#include "olap/rowset/segment_v2/column_reader.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
+#include "olap/types.h"
 #include "olap/wrapper_field.h"
 #include "runtime/mem_tracker.h"
 #include "util/defer_op.h"
@@ -91,6 +92,154 @@ private:
     std::priority_queue<MergeElement> _heap;
 };
 
+class MultiBlockMerger {
+public:
+    MultiBlockMerger(TabletSharedPtr tablet) : _tablet(tablet), _cmp(tablet) {}
+
+    Status merge(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                 RowsetWriter* rowset_writer, uint64_t* merged_rows) {
+        int rows = 0;
+        for (auto& block : blocks) {
+            rows += block->rows();
+        }
+        if (!rows) {
+            return Status::OK();
+        }
+
+        std::vector<RowRef> row_refs;
+        row_refs.reserve(rows);
+        for (auto& block : blocks) {
+            for (uint16_t i = 0; i < block->rows(); i++) {
+                row_refs.emplace_back(block.get(), i);
+            }
+        }
+        // TODO: try to use pdqsort to replace std::sort
+        // The block version is incremental.
+        std::stable_sort(row_refs.begin(), row_refs.end(), _cmp);
+
+        auto finalized_block = _tablet->tablet_schema().create_block();
+        int columns = finalized_block.columns();
+        *merged_rows += rows;
+
+        if (_tablet->keys_type() == KeysType::AGG_KEYS) {
+            auto tablet_schema = _tablet->tablet_schema();
+            int key_number = _tablet->num_key_columns();
+
+            std::vector<vectorized::AggregateFunctionPtr> agg_functions;
+            std::vector<vectorized::AggregateDataPtr> agg_places;
+
+            for (int i = key_number; i < columns; i++) {
+                vectorized::AggregateFunctionPtr function =
+                        tablet_schema.column(i).get_aggregate_function(
+                                {finalized_block.get_data_type(i)}, vectorized::AGG_LOAD_SUFFIX);
+                agg_functions.push_back(function);
+                // create aggregate data
+                vectorized::AggregateDataPtr place = new char[function->size_of_data()];
+                function->create(place);
+                agg_places.push_back(place);
+            }
+
+            for (int i = 0; i < rows; i++) {
+                auto row_ref = row_refs[i];
+
+                for (int j = key_number; j < columns; j++) {
+                    auto column_ptr = row_ref.get_column(j).get();
+                    agg_functions[j - key_number]->add(
+                            agg_places[j - key_number],
+                            const_cast<const vectorized::IColumn**>(&column_ptr), row_ref.position,
+                            nullptr);
+                }
+
+                if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                    for (int j = 0; j < key_number; j++) {
+                        finalized_block.get_by_position(j).column->assume_mutable()->insert_from(
+                                *row_ref.get_column(j), row_ref.position);
+                    }
+
+                    for (int j = key_number; j < columns; j++) {
+                        agg_functions[j - key_number]->insert_result_into(
+                                agg_places[j - key_number],
+                                finalized_block.get_by_position(j).column->assume_mutable_ref());
+                        agg_functions[j - key_number]->create(agg_places[j - key_number]);
+                    }
+
+                    if (i == rows - 1 || finalized_block.rows() == ALTER_TABLE_BATCH_SIZE) {
+                        *merged_rows -= finalized_block.rows();
+                        rowset_writer->add_block(&finalized_block);
+                        finalized_block.clear_column_data();
+                    }
+                }
+            }
+
+            for (int i = 0; i < columns - key_number; i++) {
+                agg_functions[i]->destroy(agg_places[i]);
+                delete[] agg_places[i];
+            }
+        } else {
+            std::vector<RowRef> pushed_row_refs;
+            if (_tablet->keys_type() == KeysType::DUP_KEYS) {
+                std::swap(pushed_row_refs, row_refs);
+            } else if (_tablet->keys_type() == KeysType::UNIQUE_KEYS) {
+                for (int i = 0; i < rows; i++) {
+                    if (i == rows - 1 || _cmp.compare(row_refs[i], row_refs[i + 1])) {
+                        pushed_row_refs.push_back(row_refs[i]);
+                    }
+                }
+            }
+
+            // update real inserted row number
+            rows = pushed_row_refs.size();
+            *merged_rows -= rows;
+
+            for (int i = 0; i < rows; i += ALTER_TABLE_BATCH_SIZE) {
+                int limit = std::min(ALTER_TABLE_BATCH_SIZE, rows - i);
+
+                for (int idx = 0; idx < columns; idx++) {
+                    auto column = finalized_block.get_by_position(idx).column->assume_mutable();
+
+                    for (int j = 0; j < limit; j++) {
+                        auto row_ref = pushed_row_refs[i + j];
+                        column->insert_from(*row_ref.get_column(idx), row_ref.position);
+                    }
+                }
+                rowset_writer->add_block(&finalized_block);
+                finalized_block.clear_column_data();
+            }
+        }
+
+        RETURN_IF_ERROR(rowset_writer->flush());
+        return Status::OK();
+    }
+
+private:
+    struct RowRef {
+        RowRef(vectorized::Block* block_, uint16_t position_)
+                : block(block_), position(position_) {}
+        vectorized::ColumnPtr get_column(int index) const {
+            return block->get_by_position(index).column;
+        }
+        const vectorized::Block* block;
+        uint16_t position;
+    };
+
+    struct RowRefComparator {
+        RowRefComparator(TabletSharedPtr tablet) : _num_columns(tablet->num_key_columns()) {}
+
+        int compare(const RowRef& lhs, const RowRef& rhs) const {
+            return lhs.block->compare_at(lhs.position, rhs.position, _num_columns, *rhs.block, -1);
+        }
+
+        bool operator()(const RowRef& lhs, const RowRef& rhs) const {
+            return compare(lhs, rhs) < 0;
+        }
+
+        const size_t _num_columns;
+    };
+
+    TabletSharedPtr _tablet;
+    RowRefComparator _cmp;
+};
+
 RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl)
         : _desc_tbl(desc_tbl) {
     _schema_mapping.resize(tablet_schema.num_columns());
@@ -665,6 +814,128 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
 #undef TYPE_REINTERPRET_CAST
 #undef ASSIGN_DEFAULT_VALUE
 
+Status RowBlockChanger::change_block(vectorized::Block* ref_block,
+                                     vectorized::Block* new_block) const {
+    if (new_block->columns() != _schema_mapping.size()) {
+        LOG(WARNING) << "block does not match with schema mapping rules. "
+                     << "block_schema_size=" << new_block->columns()
+                     << ", mapping_schema_size=" << _schema_mapping.size();
+        return Status::OLAPInternalError(OLAP_ERR_NOT_INITED);
+    }
+
+    // material-view or rollup task will fail now
+    if (_desc_tbl.get_row_tuples().size() != ref_block->columns()) {
+        return Status::NotSupported(
+                "_desc_tbl.get_row_tuples().size() != ref_block->columns(), maybe because rollup "
+                "not supported now. ");
+    }
+
+    std::vector<bool> nullable_tuples;
+    for (int i = 0; i < ref_block->columns(); i++) {
+        nullable_tuples.emplace_back(ref_block->get_by_position(i).column->is_nullable());
+    }
+
+    ObjectPool pool;
+    RuntimeState* state = pool.add(new RuntimeState());
+    state->set_desc_tbl(&_desc_tbl);
+    RowDescriptor row_desc = RowDescriptor::create_default(_desc_tbl, nullable_tuples);
+
+    const int row_size = ref_block->rows();
+    const int column_size = new_block->columns();
+
+    // swap ref_block[key] and new_block[value]
+    std::map<int, int> swap_idx_map;
+
+    for (int idx = 0; idx < column_size; idx++) {
+        int ref_idx = _schema_mapping[idx].ref_column;
+
+        if (!_schema_mapping[idx].materialized_function.empty()) {
+            return Status::NotSupported("Materialized function not supported now. ");
+        }
+
+        if (ref_idx < 0) {
+            // new column, write default value
+            auto value = _schema_mapping[idx].default_value;
+            auto column = new_block->get_by_position(idx).column->assume_mutable();
+            if (value->is_null()) {
+                DCHECK(column->is_nullable());
+                column->insert_many_defaults(row_size);
+            } else {
+                auto type_info = get_type_info(_schema_mapping[idx].new_column);
+                DefaultValueColumnIterator::insert_default_data(type_info.get(), value->size(),
+                                                                value->ptr(), column, row_size);
+            }
+        } else if (_schema_mapping[idx].expr != nullptr) {
+            // calculate special materialized function, to_bitmap/hll_hash/count_field or cast expr
+            vectorized::VExprContext* ctx = nullptr;
+            RETURN_IF_ERROR(
+                    vectorized::VExpr::create_expr_tree(&pool, *_schema_mapping[idx].expr, &ctx));
+
+            RETURN_IF_ERROR(ctx->prepare(state, row_desc));
+            RETURN_IF_ERROR(ctx->open(state));
+
+            int result_column_id = -1;
+            RETURN_IF_ERROR(ctx->execute(ref_block, &result_column_id));
+            DCHECK(ref_block->get_by_position(result_column_id).column->size() == row_size)
+                    << new_block->get_by_position(idx).name << " size invalid"
+                    << ", expect=" << row_size
+                    << ", real=" << ref_block->get_by_position(result_column_id).column->size();
+
+            if (_schema_mapping[idx].expr->nodes[0].node_type == TExprNodeType::CAST_EXPR) {
+                RETURN_IF_ERROR(
+                        _check_cast_valid(ref_block->get_by_position(ref_idx).column,
+                                          ref_block->get_by_position(result_column_id).column));
+            }
+            swap_idx_map[result_column_id] = idx;
+
+            ctx->close(state);
+        } else {
+            // same type, just swap column
+            swap_idx_map[ref_idx] = idx;
+        }
+    }
+
+    for (auto it : swap_idx_map) {
+        new_block->get_by_position(it.second).column.swap(
+                ref_block->get_by_position(it.first).column);
+    }
+
+    return Status::OK();
+}
+
+Status RowBlockChanger::_check_cast_valid(vectorized::ColumnPtr ref_column,
+                                          vectorized::ColumnPtr new_column) const {
+    // TODO: rethink this check
+    // This check is to prevent schema-change from causing data loss,
+    // But it is possible to generate null data in material-view or rollup.
+
+    if (ref_column->is_nullable() != new_column->is_nullable()) {
+        return Status::DataQualityError("column.is_nullable() is changed!");
+    }
+
+    if (ref_column->is_nullable()) {
+        auto* ref_null_map =
+                vectorized::check_and_get_column<vectorized::ColumnNullable>(ref_column)
+                        ->get_null_map_column()
+                        .get_data()
+                        .data();
+        auto* new_null_map =
+                vectorized::check_and_get_column<vectorized::ColumnNullable>(new_column)
+                        ->get_null_map_column()
+                        .get_data()
+                        .data();
+
+        bool is_changed = false;
+        for (size_t i = 0; i < ref_column->size(); i++) {
+            is_changed |= (ref_null_map[i] != new_null_map[i]);
+        }
+        if (is_changed) {
+            return Status::DataQualityError("is_null of data is changed!");
+        }
+    }
+    return Status::OK();
+}
+
 RowBlockSorter::RowBlockSorter(RowBlockAllocator* row_block_allocator)
         : _row_block_allocator(row_block_allocator), _swap_row_block(nullptr) {}
 
@@ -1050,6 +1321,34 @@ Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
     return res;
 }
 
+Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
+                                             RowsetWriter* rowset_writer,
+                                             TabletSharedPtr new_tablet,
+                                             TabletSharedPtr base_tablet) {
+    auto new_block =
+            std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
+    auto ref_block =
+            std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
+
+    int origin_columns_size = ref_block->columns();
+
+    rowset_reader->next_block(ref_block.get());
+    while (ref_block->rows()) {
+        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
+        RETURN_IF_ERROR(rowset_writer->add_block(new_block.get()));
+
+        new_block->clear_column_data();
+        ref_block->clear_column_data(origin_columns_size);
+        rowset_reader->next_block(ref_block.get());
+    }
+
+    if (!rowset_writer->flush()) {
+        return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
+    }
+
+    return Status::OK();
+}
+
 SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
                                                  size_t memory_limitation)
         : _row_block_changer(row_block_changer),
@@ -1062,6 +1361,18 @@ SchemaChangeWithSorting::~SchemaChangeWithSorting() {
     SAFE_DELETE(_row_block_allocator);
 }
 
+VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
+                                                   size_t memory_limitation)
+        : _changer(row_block_changer),
+          _memory_limitation(memory_limitation),
+          _temp_delta_versions(Version::mock()) {
+    _mem_tracker = MemTracker::create_tracker(
+            config::memory_limitation_per_thread_for_schema_change_bytes,
+            fmt::format("VSchemaChangeWithSorting:changer={}",
+                        std::to_string(int64(&row_block_changer))),
+            StorageEngine::instance()->schema_change_mem_tracker(), MemTrackerLevel::TASK);
+}
+
 Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
                                                RowsetWriter* rowset_writer,
                                                TabletSharedPtr new_tablet,
@@ -1219,6 +1530,89 @@ Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_read
     return res;
 }
 
+Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
+                                                RowsetWriter* rowset_writer,
+                                                TabletSharedPtr new_tablet,
+                                                TabletSharedPtr base_tablet) {
+    // for internal sorting
+    std::vector<std::unique_ptr<vectorized::Block>> blocks;
+
+    // for external sorting
+    // src_rowsets to store the rowset generated by internal sorting
+    std::vector<RowsetSharedPtr> src_rowsets;
+
+    Defer defer {[&]() {
+        // remove the intermediate rowsets generated by internal sorting
+        for (auto& row_set : src_rowsets) {
+            StorageEngine::instance()->add_unused_rowset(row_set);
+        }
+    }};
+
+    RowsetSharedPtr rowset = rowset_reader->rowset();
+    SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
+    _temp_delta_versions.first = _temp_delta_versions.second;
+
+    auto new_block =
+            std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block());
+    auto ref_block =
+            std::make_unique<vectorized::Block>(base_tablet->tablet_schema().create_block());
+
+    int origin_columns_size = ref_block->columns();
+
+    auto create_rowset = [&]() -> Status {
+        if (blocks.empty()) {
+            return Status::OK();
+        }
+
+        RowsetSharedPtr rowset;
+        RETURN_IF_ERROR(_internal_sorting(
+                blocks, Version(_temp_delta_versions.second, _temp_delta_versions.second),
+                new_tablet, BETA_ROWSET, segments_overlap, &rowset));
+        src_rowsets.push_back(rowset);
+
+        for (auto& block : blocks) {
+            _mem_tracker->release(block->allocated_bytes());
+        }
+        blocks.clear();
+
+        // increase temp version
+        _temp_delta_versions.second++;
+        return Status::OK();
+    };
+
+    rowset_reader->next_block(ref_block.get());
+    while (ref_block->rows()) {
+        RETURN_IF_ERROR(_changer.change_block(ref_block.get(), new_block.get()));
+        if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
+            RETURN_IF_ERROR(create_rowset());
+
+            if (!_mem_tracker->try_consume(new_block->allocated_bytes())) {
+                LOG(WARNING) << "Memory limitation is too small for Schema Change."
+                             << "memory_limitation=" << _memory_limitation;
+                return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
+            }
+        }
+
+        // move unique ptr
+        blocks.push_back(
+                std::make_unique<vectorized::Block>(new_tablet->tablet_schema().create_block()));
+        swap(blocks.back(), new_block);
+
+        ref_block->clear_column_data(origin_columns_size);
+        rowset_reader->next_block(ref_block.get());
+    }
+
+    RETURN_IF_ERROR(create_rowset());
+
+    if (src_rowsets.empty()) {
+        RETURN_IF_ERROR(rowset_writer->flush());
+    } else {
+        RETURN_IF_ERROR(_external_sorting(src_rowsets, rowset_writer, new_tablet));
+    }
+
+    return Status::OK();
+}
+
 bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& row_block_arr,
                                                 const Version& version, TabletSharedPtr new_tablet,
                                                 SegmentsOverlapPB segments_overlap,
@@ -1247,6 +1641,29 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
     return true;
 }
 
+Status VSchemaChangeWithSorting::_internal_sorting(
+        const std::vector<std::unique_ptr<vectorized::Block>>& blocks, const Version& version,
+        TabletSharedPtr new_tablet, RowsetTypePB new_rowset_type,
+        SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset) {
+    uint64_t merged_rows = 0;
+    MultiBlockMerger merger(new_tablet);
+
+    std::unique_ptr<RowsetWriter> rowset_writer;
+    RETURN_IF_ERROR(
+            new_tablet->create_rowset_writer(version, VISIBLE, segments_overlap, &rowset_writer));
+
+    Defer defer {[&]() {
+        new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
+                                                   rowset_writer->rowset_id().to_string());
+    }};
+
+    RETURN_IF_ERROR(merger.merge(blocks, rowset_writer.get(), &merged_rows));
+
+    _add_merged_rows(merged_rows);
+    *rowset = rowset_writer->build();
+    return Status::OK();
+}
+
 bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
                                                 RowsetWriter* rowset_writer,
                                                 TabletSharedPtr new_tablet) {
@@ -1275,6 +1692,25 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
     return true;
 }
 
+Status VSchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_rowsets,
+                                                   RowsetWriter* rowset_writer,
+                                                   TabletSharedPtr new_tablet) {
+    std::vector<RowsetReaderSharedPtr> rs_readers;
+    for (auto& rowset : src_rowsets) {
+        RowsetReaderSharedPtr rs_reader;
+        RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
+        rs_readers.push_back(rs_reader);
+    }
+
+    Merger::Statistics stats;
+    RETURN_IF_ERROR(Merger::vmerge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers,
+                                           rowset_writer, &stats));
+
+    _add_merged_rows(stats.merged_rows);
+    _add_filtered_rows(stats.filtered_rows);
+    return Status::OK();
+}
+
 Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
     LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
               << ", new_tablet_id=" << request.new_tablet_id
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2f820ae79e..71507fd7fb 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -52,7 +52,12 @@ public:
     Status change_row_block(const RowBlock* ref_block, int32_t data_version,
                             RowBlock* mutable_block, uint64_t* filtered_rows) const;
 
+    Status change_block(vectorized::Block* ref_block, vectorized::Block* new_block) const;
+
 private:
+    Status _check_cast_valid(vectorized::ColumnPtr ref_column,
+                             vectorized::ColumnPtr new_column) const;
+
     // @brief column-mapping specification of new schema
     SchemaMapping _schema_mapping;
 
@@ -180,6 +185,17 @@ private:
     DISALLOW_COPY_AND_ASSIGN(SchemaChangeDirectly);
 };
 
+class VSchemaChangeDirectly : public SchemaChange {
+public:
+    VSchemaChangeDirectly(const RowBlockChanger& row_block_changer) : _changer(row_block_changer) {}
+
+private:
+    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                          TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
+    const RowBlockChanger& _changer;
+};
+
 // @breif schema change with sorting
 class SchemaChangeWithSorting : public SchemaChange {
 public:
@@ -206,6 +222,29 @@ private:
     DISALLOW_COPY_AND_ASSIGN(SchemaChangeWithSorting);
 };
 
+class VSchemaChangeWithSorting : public SchemaChange {
+public:
+    VSchemaChangeWithSorting(const RowBlockChanger& row_block_changer, size_t memory_limitation);
+    ~VSchemaChangeWithSorting() override = default;
+
+private:
+    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                          TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
+    Status _internal_sorting(const std::vector<std::unique_ptr<vectorized::Block>>& blocks,
+                             const Version& temp_delta_versions, TabletSharedPtr new_tablet,
+                             RowsetTypePB new_rowset_type, SegmentsOverlapPB segments_overlap,
+                             RowsetSharedPtr* rowset);
+
+    Status _external_sorting(std::vector<RowsetSharedPtr>& src_rowsets, RowsetWriter* rowset_writer,
+                             TabletSharedPtr new_tablet);
+
+    const RowBlockChanger& _changer;
+    size_t _memory_limitation;
+    Version _temp_delta_versions;
+    std::shared_ptr<MemTracker> _mem_tracker;
+};
+
 class SchemaChangeHandler {
 public:
     static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
@@ -218,12 +257,23 @@ public:
     static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer,
                                                           bool sc_sorting, bool sc_directly) {
         if (sc_sorting) {
-            return std::make_unique<SchemaChangeWithSorting>(
-                    rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+            if (config::enable_vectorized_alter_table) {
+                return std::make_unique<VSchemaChangeWithSorting>(
+                        rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+            } else {
+                return std::make_unique<SchemaChangeWithSorting>(
+                        rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+            }
         }
+
         if (sc_directly) {
-            return std::make_unique<SchemaChangeDirectly>(rb_changer);
+            if (config::enable_vectorized_alter_table) {
+                return std::make_unique<VSchemaChangeDirectly>(rb_changer);
+            } else {
+                return std::make_unique<SchemaChangeDirectly>(rb_changer);
+            }
         }
+
         return std::make_unique<LinkedSchemaChange>(rb_changer);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org