You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/06/16 02:50:12 UTC

[incubator-doris] branch master updated: [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)

This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5805f8077f [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
5805f8077f is described below

commit 5805f8077f77295f2dd58a3fc7b6660c7fb24262
Author: Pxl <95...@qq.com>
AuthorDate: Thu Jun 16 10:50:08 2022 +0800

    [Feature] [Vectorized] Some pre-refactorings or interface additions for schema change part2 (#10003)
---
 be/src/agent/task_worker_pool.cpp                  |   6 +-
 be/src/common/config.h                             |   3 +
 be/src/olap/push_handler.cpp                       |  32 +-
 be/src/olap/push_handler.h                         |  43 +-
 be/src/olap/reader.cpp                             |  30 +-
 be/src/olap/reader.h                               |  12 +-
 be/src/olap/rowset/beta_rowset_reader.h            |   3 +-
 be/src/olap/schema_change.cpp                      | 493 ++++++++-------------
 be/src/olap/schema_change.h                        | 166 ++++---
 be/src/olap/tablet.cpp                             |   3 +-
 be/src/olap/task/engine_alter_tablet_task.cpp      |   9 +-
 be/src/olap/task/engine_alter_tablet_task.h        |   8 +-
 .../olap/task/engine_storage_migration_task_v2.cpp |   4 +-
 be/src/olap/tuple_reader.cpp                       |   6 +-
 be/src/olap/tuple_reader.h                         |  15 -
 be/src/runtime/descriptors.cpp                     |  15 +-
 be/src/runtime/descriptors.h                       |  55 ++-
 be/src/runtime/runtime_state.cpp                   |  14 +
 be/src/runtime/runtime_state.h                     |  25 +-
 be/src/runtime/thread_mem_tracker_mgr.h            |   3 +-
 be/src/vec/columns/column.h                        |  14 +-
 be/src/vec/olap/block_reader.cpp                   |   7 +-
 be/src/vec/olap/block_reader.h                     |   7 -
 be/test/olap/schema_change_test.cpp                |   6 +-
 .../java/org/apache/doris/alter/RollupJobV2.java   |  23 +-
 .../org/apache/doris/alter/SchemaChangeJobV2.java  |  51 ++-
 .../java/org/apache/doris/analysis/Analyzer.java   |   7 +-
 .../java/org/apache/doris/analysis/InsertStmt.java |   6 +-
 .../doris/load/loadv2/LoadingTaskPlanner.java      |   6 +-
 .../org/apache/doris/load/loadv2/SparkLoadJob.java |   6 +-
 .../apache/doris/load/update/UpdatePlanner.java    |   6 +-
 .../org/apache/doris/task/AlterReplicaTask.java    |  28 +-
 .../doris/planner/StreamLoadScanNodeTest.java      |  90 +---
 gensrc/thrift/AgentService.thrift                  |   1 +
 34 files changed, 552 insertions(+), 651 deletions(-)

diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp
index 79953f6709..06ca61eef0 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -535,7 +535,7 @@ void TaskWorkerPool::_alter_tablet(const TAgentTaskRequest& agent_task_req, int6
             if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
                 error_msgs.push_back("The data quality does not satisfy, please check your data. ");
             }
-            status = Status::DataQualityError("The data quality does not satisfy");
+            status = sc_status;
         } else {
             status = Status::OK();
         }
@@ -620,7 +620,7 @@ void TaskWorkerPool::_push_worker_thread_callback() {
             agent_task_req = _tasks[index];
             push_req = agent_task_req.push_req;
             _tasks.erase(_tasks.begin() + index);
-        } while (0);
+        } while (false);
 
         if (index < 0) {
             // there is no high priority task in queue
@@ -1764,7 +1764,7 @@ void TaskWorkerPool::_storage_medium_migrate_v2(const TAgentTaskRequest& agent_t
             if (sc_status.precise_code() == OLAP_ERR_DATA_QUALITY_ERR) {
                 error_msgs.push_back("The data quality does not satisfy, please check your data. ");
             }
-            status = Status::DataQualityError("The data quality does not satisfy");
+            status = sc_status;
         } else {
             status = Status::OK();
         }
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 8a922f076e..4885c17193 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -244,6 +244,9 @@ CONF_Bool(enable_low_cardinality_optimize, "true");
 CONF_mBool(disable_auto_compaction, "false");
 // whether enable vectorized compaction
 CONF_Bool(enable_vectorized_compaction, "true");
+// whether enable vectorized schema change
+CONF_Bool(enable_vectorized_alter_table, "false");
+
 // check the configuration of auto compaction in seconds when auto compaction disabled
 CONF_mInt32(check_auto_compaction_interval_seconds, "5");
 
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 02e0d46dc9..4ab7ecfddd 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -22,6 +22,7 @@
 #include <iostream>
 #include <sstream>
 
+#include "common/object_pool.h"
 #include "common/status.h"
 #include "exec/parquet_scanner.h"
 #include "olap/row.h"
@@ -32,11 +33,6 @@
 #include "olap/tablet.h"
 #include "runtime/exec_env.h"
 
-using std::list;
-using std::map;
-using std::string;
-using std::vector;
-
 namespace doris {
 
 // Process push command, the main logical is as follows:
@@ -60,6 +56,9 @@ Status PushHandler::process_streaming_ingestion(TabletSharedPtr tablet, const TP
 
     Status res = Status::OK();
     _request = request;
+
+    DescriptorTbl::create(&_pool, _request.desc_tbl, &_desc_tbl);
+
     std::vector<TabletVars> tablet_vars(1);
     tablet_vars[0].tablet = tablet;
     res = _do_streaming_ingestion(tablet, request, push_type, &tablet_vars, tablet_info_vec);
@@ -315,16 +314,15 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, TabletSharedPtr new_
         // 5. Convert data for schema change tables
         VLOG_TRACE << "load to related tables of schema_change if possible.";
         if (new_tablet != nullptr) {
-            auto schema_change_handler = SchemaChangeHandler::instance();
-            res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
-                                                                new_rowset);
+            res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
+                                                              new_rowset, *_desc_tbl);
             if (!res.ok()) {
                 LOG(WARNING) << "failed to change schema version for delta."
                              << "[res=" << res << " new_tablet='" << new_tablet->full_name()
                              << "']";
             }
         }
-    } while (0);
+    } while (false);
 
     VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
                << ", processed_rows" << num_rows;
@@ -456,16 +454,15 @@ Status PushHandler::_convert(TabletSharedPtr cur_tablet, TabletSharedPtr new_tab
         // 7. Convert data for schema change tables
         VLOG_TRACE << "load to related tables of schema_change if possible.";
         if (new_tablet != nullptr) {
-            auto schema_change_handler = SchemaChangeHandler::instance();
-            res = schema_change_handler->schema_version_convert(cur_tablet, new_tablet, cur_rowset,
-                                                                new_rowset);
+            res = SchemaChangeHandler::schema_version_convert(cur_tablet, new_tablet, cur_rowset,
+                                                              new_rowset, *_desc_tbl);
             if (!res.ok()) {
                 LOG(WARNING) << "failed to change schema version for delta."
                              << "[res=" << res << " new_tablet='" << new_tablet->full_name()
                              << "']";
             }
         }
-    } while (0);
+    } while (false);
 
     SAFE_DELETE(reader);
     VLOG_TRACE << "convert delta file end. res=" << res << ", tablet=" << cur_tablet->full_name()
@@ -502,7 +499,7 @@ IBinaryReader* IBinaryReader::create(bool need_decompress) {
     return reader;
 }
 
-BinaryReader::BinaryReader() : IBinaryReader(), _row_buf(nullptr), _row_buf_size(0) {}
+BinaryReader::BinaryReader() : _row_buf(nullptr), _row_buf_size(0) {}
 
 Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
     Status res = Status::OK();
@@ -527,7 +524,7 @@ Status BinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
 
         _tablet = tablet;
         _ready = true;
-    } while (0);
+    } while (false);
 
     if (!res.ok()) {
         SAFE_DELETE_ARRAY(_row_buf);
@@ -637,8 +634,7 @@ Status BinaryReader::next(RowCursor* row) {
 }
 
 LzoBinaryReader::LzoBinaryReader()
-        : IBinaryReader(),
-          _row_buf(nullptr),
+        : _row_buf(nullptr),
           _row_compressed_buf(nullptr),
           _row_info_buf(nullptr),
           _max_row_num(0),
@@ -670,7 +666,7 @@ Status LzoBinaryReader::init(TabletSharedPtr tablet, BinaryFile* file) {
 
         _tablet = tablet;
         _ready = true;
-    } while (0);
+    } while (false);
 
     if (!res.ok()) {
         SAFE_DELETE_ARRAY(_row_info_buf);
diff --git a/be/src/olap/push_handler.h b/be/src/olap/push_handler.h
index 13da3018d4..a290eb01c7 100644
--- a/be/src/olap/push_handler.h
+++ b/be/src/olap/push_handler.h
@@ -45,10 +45,10 @@ struct TabletVars {
 
 class PushHandler {
 public:
-    typedef std::vector<ColumnMapping> SchemaMapping;
+    using SchemaMapping = std::vector<ColumnMapping>;
 
-    PushHandler() {}
-    ~PushHandler() {}
+    PushHandler() = default;
+    ~PushHandler() = default;
 
     // Load local data file into specified tablet.
     Status process_streaming_ingestion(TabletSharedPtr tablet, const TPushReq& request,
@@ -80,6 +80,9 @@ private:
     // mainly tablet_id, version and delta file path
     TPushReq _request;
 
+    ObjectPool _pool;
+    DescriptorTbl* _desc_tbl = nullptr;
+
     int64_t _write_bytes = 0;
     int64_t _write_rows = 0;
     DISALLOW_COPY_AND_ASSIGN(PushHandler);
@@ -88,7 +91,7 @@ private:
 // package FileHandlerWithBuf to read header of dpp output file
 class BinaryFile : public FileHandlerWithBuf {
 public:
-    BinaryFile() {}
+    BinaryFile() = default;
     virtual ~BinaryFile() { close(); }
 
     Status init(const char* path);
@@ -107,7 +110,7 @@ private:
 class IBinaryReader {
 public:
     static IBinaryReader* create(bool need_decompress);
-    virtual ~IBinaryReader() {}
+    virtual ~IBinaryReader() = default;
 
     virtual Status init(TabletSharedPtr tablet, BinaryFile* file) = 0;
     virtual Status finalize() = 0;
@@ -139,14 +142,14 @@ protected:
 class BinaryReader : public IBinaryReader {
 public:
     explicit BinaryReader();
-    virtual ~BinaryReader() { finalize(); }
+    ~BinaryReader() override { finalize(); }
 
-    virtual Status init(TabletSharedPtr tablet, BinaryFile* file);
-    virtual Status finalize();
+    Status init(TabletSharedPtr tablet, BinaryFile* file) override;
+    Status finalize() override;
 
-    virtual Status next(RowCursor* row);
+    Status next(RowCursor* row) override;
 
-    virtual bool eof() { return _curr >= _content_len; }
+    bool eof() override { return _curr >= _content_len; }
 
 private:
     char* _row_buf;
@@ -156,20 +159,20 @@ private:
 class LzoBinaryReader : public IBinaryReader {
 public:
     explicit LzoBinaryReader();
-    virtual ~LzoBinaryReader() { finalize(); }
+    ~LzoBinaryReader() override { finalize(); }
 
-    virtual Status init(TabletSharedPtr tablet, BinaryFile* file);
-    virtual Status finalize();
+    Status init(TabletSharedPtr tablet, BinaryFile* file) override;
+    Status finalize() override;
 
-    virtual Status next(RowCursor* row);
+    Status next(RowCursor* row) override;
 
-    virtual bool eof() { return _curr >= _content_len && _row_num == 0; }
+    bool eof() override { return _curr >= _content_len && _row_num == 0; }
 
 private:
     Status _next_block();
 
-    typedef uint32_t RowNumType;
-    typedef uint64_t CompressedSizeType;
+    using RowNumType = uint32_t;
+    using CompressedSizeType = uint64_t;
 
     char* _row_buf;
     char* _row_compressed_buf;
@@ -184,7 +187,7 @@ private:
 class PushBrokerReader {
 public:
     PushBrokerReader() : _ready(false), _eof(false), _fill_tuple(false) {}
-    ~PushBrokerReader() {}
+    ~PushBrokerReader() = default;
 
     Status init(const Schema* schema, const TBrokerScanRange& t_scan_range,
                 const TDescriptorTable& t_desc_tbl);
@@ -195,8 +198,8 @@ public:
         _ready = false;
         return Status::OK();
     }
-    bool eof() { return _eof; }
-    bool is_fill_tuple() { return _fill_tuple; }
+    bool eof() const { return _eof; }
+    bool is_fill_tuple() const { return _fill_tuple; }
     MemPool* mem_pool() { return _mem_pool.get(); }
 
 private:
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 0d59b7f969..497c014e46 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -23,23 +23,19 @@
 #include <charconv>
 #include <unordered_set>
 
+#include "common/status.h"
 #include "olap/bloom_filter_predicate.h"
 #include "olap/comparison_predicate.h"
 #include "olap/in_list_predicate.h"
 #include "olap/null_predicate.h"
+#include "olap/olap_common.h"
 #include "olap/row.h"
-#include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/rowset/column_data.h"
 #include "olap/schema.h"
-#include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "runtime/mem_pool.h"
-#include "runtime/string_value.hpp"
 #include "util/date_func.h"
 #include "util/mem_util.hpp"
-#include "vec/olap/vcollect_iterator.h"
 
 using std::nothrow;
 using std::set;
@@ -313,7 +309,8 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
         }
         VLOG_NOTICE << "return column is empty, using full column as default.";
     } else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION ||
-                read_params.reader_type == READER_BASE_COMPACTION) &&
+                read_params.reader_type == READER_BASE_COMPACTION ||
+                read_params.reader_type == READER_ALTER_TABLE) &&
                !read_params.return_columns.empty()) {
         _return_columns = read_params.return_columns;
         for (auto id : read_params.return_columns) {
@@ -834,12 +831,6 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
     if (read_params.reader_type == READER_CUMULATIVE_COMPACTION) {
         return Status::OK();
     }
-    Status ret;
-    {
-        std::shared_lock rdlock(_tablet->get_header_lock());
-        ret = _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
-                                   read_params.version.second, this);
-    }
     // Only BASE_COMPACTION need set filter_delete = true
     // other reader type:
     // QUERY will filter the row in query layer to keep right result use where clause.
@@ -847,7 +838,18 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
     if (read_params.reader_type == READER_BASE_COMPACTION) {
         _filter_delete = true;
     }
-    return ret;
+
+    auto delete_init = [&]() -> Status {
+        return _delete_handler.init(_tablet->tablet_schema(), _tablet->delete_predicates(),
+                                    read_params.version.second, this);
+    };
+
+    if (read_params.reader_type == READER_ALTER_TABLE) {
+        return delete_init();
+    }
+
+    std::shared_lock rdlock(_tablet->get_header_lock());
+    return delete_init();
 }
 
 } // namespace doris
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index f91145d07c..2593a51bae 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -20,20 +20,10 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
-#include <list>
-#include <memory>
-#include <queue>
-#include <sstream>
-#include <stack>
-#include <string>
-#include <utility>
-#include <vector>
-
 #include "exprs/bloomfilter_predicate.h"
 #include "olap/column_predicate.h"
 #include "olap/delete_handler.h"
 #include "olap/olap_cond.h"
-#include "olap/olap_define.h"
 #include "olap/row_cursor.h"
 #include "olap/rowset/rowset_reader.h"
 #include "olap/tablet.h"
@@ -130,7 +120,7 @@ public:
 
     uint64_t filtered_rows() const {
         return _stats.rows_del_filtered + _stats.rows_conditions_filtered +
-               _stats.rows_vec_del_cond_filtered;
+               _stats.rows_vec_del_cond_filtered + _stats.rows_vec_cond_filtered;
     }
 
     void set_batch_size(int batch_size) { _batch_size = batch_size; }
diff --git a/be/src/olap/rowset/beta_rowset_reader.h b/be/src/olap/rowset/beta_rowset_reader.h
index 564980bac8..de1251f13f 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -47,7 +47,8 @@ public:
 
     // Return the total number of filtered rows, will be used for validation of schema change
     int64_t filtered_rows() override {
-        return _stats->rows_del_filtered + _stats->rows_conditions_filtered;
+        return _stats->rows_del_filtered + _stats->rows_conditions_filtered +
+               _stats->rows_vec_del_cond_filtered + _stats->rows_vec_cond_filtered;
     }
 
     RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index f7d86d8d86..68724d29bc 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -17,15 +17,12 @@
 
 #include "olap/schema_change.h"
 
-#include <pthread.h>
-#include <signal.h>
-
-#include <algorithm>
 #include <vector>
 
-#include "agent/cgroups_mgr.h"
-#include "common/resource_tls.h"
+#include "common/status.h"
+#include "gutil/integral_types.h"
 #include "olap/merger.h"
+#include "olap/olap_common.h"
 #include "olap/row.h"
 #include "olap/row_block.h"
 #include "olap/row_cursor.h"
@@ -33,21 +30,23 @@
 #include "olap/storage_engine.h"
 #include "olap/tablet.h"
 #include "olap/wrapper_field.h"
-#include "runtime/exec_env.h"
-#include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
 #include "util/defer_op.h"
+#include "vec/aggregate_functions/aggregate_function.h"
+#include "vec/aggregate_functions/aggregate_function_reader.h"
+#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr.h"
+#include "vec/exprs/vexpr_context.h"
+#include "vec/olap/block_reader.h"
 
-using std::deque;
-using std::list;
 using std::nothrow;
-using std::pair;
-using std::string;
-using std::stringstream;
-using std::vector;
 
 namespace doris {
 
+constexpr int ALTER_TABLE_BATCH_SIZE = 4096;
+
 class RowBlockSorter {
 public:
     explicit RowBlockSorter(RowBlockAllocator* allocator);
@@ -92,19 +91,20 @@ private:
     std::priority_queue<MergeElement> _heap;
 };
 
-RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema) {
+RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl)
+        : _desc_tbl(desc_tbl) {
     _schema_mapping.resize(tablet_schema.num_columns());
 }
 
 RowBlockChanger::RowBlockChanger(const TabletSchema& tablet_schema,
-                                 const DeleteHandler* delete_handler) {
+                                 const DeleteHandler* delete_handler, DescriptorTbl desc_tbl)
+        : _desc_tbl(desc_tbl) {
     _schema_mapping.resize(tablet_schema.num_columns());
     _delete_handler = delete_handler;
 }
 
 RowBlockChanger::~RowBlockChanger() {
-    SchemaMapping::iterator it = _schema_mapping.begin();
-    for (; it != _schema_mapping.end(); ++it) {
+    for (auto it = _schema_mapping.begin(); it != _schema_mapping.end(); ++it) {
         SAFE_DELETE(it->default_value);
     }
     _schema_mapping.clear();
@@ -212,7 +212,7 @@ public:
     }
 
 private:
-    typedef std::pair<FieldType, FieldType> convert_type_pair;
+    using convert_type_pair = std::pair<FieldType, FieldType>;
     std::unordered_set<convert_type_pair, ConvertTypeMapHash> _convert_type_set;
 
     DISALLOW_COPY_AND_ASSIGN(ConvertTypeResolver);
@@ -230,7 +230,7 @@ ConvertTypeResolver::ConvertTypeResolver() {
     add_convert_type_mapping<OLAP_FIELD_TYPE_CHAR, OLAP_FIELD_TYPE_DATE>();
 
     // supported type convert should annotate in doc:
-    // http://doris.incubator.apache.org/master/zh-CN/sql-reference/sql-statements/Data%20Definition/ALTER%20TABLE.html#description
+    // https://doris.apache.org/zh-CN/sql-manual/sql-reference/Data-Definition-Statements/Alter/ALTER-TABLE-COLUMN.html#alter-table-column
     // If type convert is supported here, you should check fe/src/main/java/org/apache/doris/catalog/ColumnType.java to supported it either
     // from varchar type
     add_convert_type_mapping<OLAP_FIELD_TYPE_VARCHAR, OLAP_FIELD_TYPE_TINYINT>();
@@ -287,7 +287,7 @@ ConvertTypeResolver::ConvertTypeResolver() {
     add_convert_type_mapping<OLAP_FIELD_TYPE_INT, OLAP_FIELD_TYPE_DATE>();
 }
 
-ConvertTypeResolver::~ConvertTypeResolver() {}
+ConvertTypeResolver::~ConvertTypeResolver() = default;
 
 bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
                int field_idx, int ref_field_idx, MemPool* mem_pool) {
@@ -496,7 +496,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
         if (_schema_mapping[i].ref_column >= 0) {
             if (!_schema_mapping[i].materialized_function.empty()) {
                 bool (*_do_materialized_transform)(RowCursor*, RowCursor*, const TabletColumn&, int,
-                                                   int, MemPool*);
+                                                   int, MemPool*) = nullptr;
                 if (_schema_mapping[i].materialized_function == "to_bitmap") {
                     _do_materialized_transform = to_bitmap;
                 } else if (_schema_mapping[i].materialized_function == "hll_hash") {
@@ -545,7 +545,7 @@ Status RowBlockChanger::change_row_block(const RowBlock* ref_block, int32_t data
                     mutable_block->get_row(new_row_index++, &write_helper);
                     ref_block->get_row(row_index, &read_helper);
 
-                    if (true == read_helper.is_null(ref_column)) {
+                    if (read_helper.is_null(ref_column)) {
                         write_helper.set_null(i);
                     } else {
                         write_helper.set_not_null(i);
@@ -693,7 +693,7 @@ bool RowBlockSorter::sort(RowBlock** row_block) {
 
     RowCursor helper_row;
     auto res = helper_row.init(_swap_row_block->tablet_schema());
-    if (!res.ok()) {
+    if (!res) {
         LOG(WARNING) << "row cursor init failed.res:" << res;
         return false;
     }
@@ -807,7 +807,7 @@ bool RowBlockAllocator::is_memory_enough_for_sorting(size_t num_rows, size_t all
 
 RowBlockMerger::RowBlockMerger(TabletSharedPtr tablet) : _tablet(tablet) {}
 
-RowBlockMerger::~RowBlockMerger() {}
+RowBlockMerger::~RowBlockMerger() = default;
 
 bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWriter* rowset_writer,
                            uint64_t* merged_rows) {
@@ -815,14 +815,24 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
     RowCursor row_cursor;
     std::unique_ptr<MemPool> mem_pool(new MemPool("RowBlockMerger"));
     std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
+
+    auto merge_error = [&]() -> bool {
+        while (!_heap.empty()) {
+            MergeElement element = _heap.top();
+            _heap.pop();
+            SAFE_DELETE(element.row_cursor);
+        }
+        return false;
+    };
+
     if (row_cursor.init(_tablet->tablet_schema()) != Status::OK()) {
         LOG(WARNING) << "fail to init row cursor.";
-        goto MERGE_ERR;
+        return merge_error();
     }
 
     if (!_make_heap(row_block_arr)) {
         // There is error log in _make_heap, so no need to more log.
-        goto MERGE_ERR;
+        return merge_error();
     }
 
     row_cursor.allocate_memory_for_string_type(_tablet->tablet_schema());
@@ -835,7 +845,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
         if (KeysType::DUP_KEYS == _tablet->keys_type()) {
             if (rowset_writer->add_row(row_cursor) != Status::OK()) {
                 LOG(WARNING) << "fail to add row to rowset writer.";
-                goto MERGE_ERR;
+                return merge_error();
             }
             continue;
         }
@@ -850,7 +860,7 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
         agg_finalize_row(&row_cursor, mem_pool.get());
         if (rowset_writer->add_row(row_cursor) != Status::OK()) {
             LOG(WARNING) << "fail to add row to rowset writer.";
-            goto MERGE_ERR;
+            return merge_error();
         }
 
         // the memory allocate by mem pool has been copied,
@@ -860,20 +870,11 @@ bool RowBlockMerger::merge(const std::vector<RowBlock*>& row_block_arr, RowsetWr
     }
     if (rowset_writer->flush() != Status::OK()) {
         LOG(WARNING) << "failed to finalizing writer.";
-        goto MERGE_ERR;
+        return merge_error();
     }
 
     *merged_rows = tmp_merged_rows;
     return true;
-
-MERGE_ERR:
-    while (_heap.size() > 0) {
-        MergeElement element = _heap.top();
-        _heap.pop();
-        SAFE_DELETE(element.row_cursor);
-    }
-
-    return false;
 }
 
 bool RowBlockMerger::_make_heap(const std::vector<RowBlock*>& row_block_arr) {
@@ -914,40 +915,35 @@ void RowBlockMerger::_pop_heap() {
     element.row_block->get_row(element.row_block_index, element.row_cursor);
 
     _heap.push(element);
-    return;
 }
 
-Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader,
-                                   RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
-                                   TabletSharedPtr base_tablet) {
+Status LinkedSchemaChange::process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                                   TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
     // In some cases, there may be more than one type of rowset in a tablet,
     // in which case the conversion cannot be done directly by linked schema change,
     // but requires direct schema change to rewrite the data.
-    if (rowset_reader->type() != new_rowset_writer->type()) {
+    if (rowset_reader->type() != rowset_writer->type()) {
         LOG(INFO) << "the type of rowset " << rowset_reader->rowset()->rowset_id()
                   << " in base tablet " << base_tablet->tablet_id() << " is not same as type "
-                  << new_rowset_writer->type() << ", use direct schema change.";
-        SchemaChangeDirectly scd(_row_block_changer);
-        return scd.process(rowset_reader, new_rowset_writer, new_tablet, base_tablet);
+                  << rowset_writer->type() << ", use direct schema change.";
+        return SchemaChangeHandler::get_sc_procedure(_row_block_changer, false, true)
+                ->process(rowset_reader, rowset_writer, new_tablet, base_tablet);
     } else {
-        Status status = new_rowset_writer->add_rowset_for_linked_schema_change(
+        Status status = rowset_writer->add_rowset_for_linked_schema_change(
                 rowset_reader->rowset(), _row_block_changer.get_schema_mapping());
-        if (!status.ok()) {
+        if (!status) {
             LOG(WARNING) << "fail to convert rowset."
                          << ", new_tablet=" << new_tablet->full_name()
                          << ", base_tablet=" << base_tablet->full_name()
-                         << ", version=" << new_rowset_writer->version().first << "-"
-                         << new_rowset_writer->version().second;
+                         << ", version=" << rowset_writer->version().first << "-"
+                         << rowset_writer->version().second;
         }
         return status;
     }
 }
 
 SchemaChangeDirectly::SchemaChangeDirectly(const RowBlockChanger& row_block_changer)
-        : SchemaChange(),
-          _row_block_changer(row_block_changer),
-          _row_block_allocator(nullptr),
-          _cursor(nullptr) {}
+        : _row_block_changer(row_block_changer), _row_block_allocator(nullptr), _cursor(nullptr) {}
 
 SchemaChangeDirectly::~SchemaChangeDirectly() {
     VLOG_NOTICE << "~SchemaChangeDirectly()";
@@ -985,9 +981,9 @@ Status reserve_block(std::unique_ptr<RowBlock, RowBlockDeleter>* block_handle_pt
     return Status::OK();
 }
 
-Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
-                                     RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
-                                     TabletSharedPtr base_tablet) {
+Status SchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader,
+                                            RowsetWriter* rowset_writer, TabletSharedPtr new_tablet,
+                                            TabletSharedPtr base_tablet) {
     if (_row_block_allocator == nullptr) {
         _row_block_allocator = new RowBlockAllocator(new_tablet->tablet_schema(), 0);
         if (_row_block_allocator == nullptr) {
@@ -1010,16 +1006,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
     }
 
     Status res = Status::OK();
-    if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
-        res = rowset_writer->flush();
-        if (!res.ok()) {
-            LOG(WARNING) << "create empty version for schema change failed."
-                         << "version=" << rowset_writer->version().first << "-"
-                         << rowset_writer->version().second;
-            return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
-        }
-        return Status::OK();
-    }
 
     VLOG_NOTICE << "init writer. new_tablet=" << new_tablet->full_name()
                 << ", block_row_number=" << new_tablet->num_rows_per_row_block();
@@ -1030,10 +1016,6 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         }
     });
 
-    // Reset filtered_rows and merged_rows statistic
-    reset_merged_rows();
-    reset_filtered_rows();
-
     RowBlock* ref_row_block = nullptr;
     rowset_reader->next_block(&ref_row_block);
     while (ref_row_block != nullptr && ref_row_block->has_remaining()) {
@@ -1049,7 +1031,7 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         RETURN_NOT_OK_LOG(res, "failed to change data in row block.");
 
         // rows filtered by delete handler one by one
-        add_filtered_rows(filtered_rows);
+        _add_filtered_rows(filtered_rows);
 
         if (!_write_row_block(rowset_writer, new_row_block.get())) {
             res = Status::OLAPInternalError(OLAP_ERR_SCHEMA_CHANGE_INFO_INVALID);
@@ -1065,48 +1047,25 @@ Status SchemaChangeDirectly::process(RowsetReaderSharedPtr rowset_reader,
         return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
     }
 
-    // rows filtered by zone map against delete handler
-    add_filtered_rows(rowset_reader->filtered_rows());
-
-    // Check row num changes
-    if (config::row_nums_check) {
-        if (rowset_reader->rowset()->num_rows() !=
-            rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
-            LOG(WARNING) << "fail to check row num! "
-                         << "source_rows=" << rowset_reader->rowset()->num_rows()
-                         << ", merged_rows=" << merged_rows()
-                         << ", filtered_rows=" << filtered_rows()
-                         << ", new_index_rows=" << rowset_writer->num_rows();
-            res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
-        }
-    }
-    LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
-              << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
-              << ", new_index_rows=" << rowset_writer->num_rows();
     return res;
 }
 
 SchemaChangeWithSorting::SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
                                                  size_t memory_limitation)
-        : SchemaChange(),
-          _row_block_changer(row_block_changer),
+        : _row_block_changer(row_block_changer),
           _memory_limitation(memory_limitation),
-          _row_block_allocator(nullptr) {
-    // Every time SchemaChange is used for external rowing, some temporary versions (such as 999, 1000, 1001) will be written, in order to avoid Cache conflicts, temporary
-    // The version number takes a BIG NUMBER plus the version number of the current SchemaChange
-    _temp_delta_versions.first = (1 << 28);
-    _temp_delta_versions.second = (1 << 28);
-    // TODO(zyh): remove the magic number
-}
+          _temp_delta_versions(Version::mock()),
+          _row_block_allocator(nullptr) {}
 
 SchemaChangeWithSorting::~SchemaChangeWithSorting() {
     VLOG_NOTICE << "~SchemaChangeWithSorting()";
     SAFE_DELETE(_row_block_allocator);
 }
 
-Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
-                                        RowsetWriter* new_rowset_writer, TabletSharedPtr new_tablet,
-                                        TabletSharedPtr base_tablet) {
+Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
+                                               RowsetWriter* rowset_writer,
+                                               TabletSharedPtr new_tablet,
+                                               TabletSharedPtr base_tablet) {
     if (_row_block_allocator == nullptr) {
         _row_block_allocator =
                 new (nothrow) RowBlockAllocator(new_tablet->tablet_schema(), _memory_limitation);
@@ -1119,17 +1078,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
     Status res = Status::OK();
     RowsetSharedPtr rowset = rowset_reader->rowset();
 
-    if (rowset->empty() || rowset->num_rows() == 0) {
-        res = new_rowset_writer->flush();
-        if (!res.ok()) {
-            LOG(WARNING) << "create empty version for schema change failed."
-                         << " version=" << new_rowset_writer->version().first << "-"
-                         << new_rowset_writer->version().second;
-            return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
-        }
-        return Status::OK();
-    }
-
     RowBlockSorter row_block_sorter(_row_block_allocator);
 
     // for internal sorting
@@ -1155,10 +1103,6 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
 
     _temp_delta_versions.first = _temp_delta_versions.second;
 
-    // Reset filtered_rows and merged_rows statistic
-    reset_merged_rows();
-    reset_filtered_rows();
-
     SegmentsOverlapPB segments_overlap = rowset->rowset_meta()->segments_overlap();
     RowBlock* ref_row_block = nullptr;
     rowset_reader->next_block(&ref_row_block);
@@ -1180,7 +1124,7 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
         }
 
         if (new_row_block == nullptr) {
-            if (row_block_arr.size() < 1) {
+            if (row_block_arr.empty()) {
                 LOG(WARNING) << "Memory limitation is too small for Schema Change."
                              << "memory_limitation=" << _memory_limitation;
                 return Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR);
@@ -1212,12 +1156,12 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
         uint64_t filtered_rows = 0;
         res = _row_block_changer.change_row_block(ref_row_block, rowset_reader->version().second,
                                                   new_row_block, &filtered_rows);
-        if (!res.ok()) {
+        if (!res) {
             row_block_arr.push_back(new_row_block);
             LOG(WARNING) << "failed to change data in row block.";
             return res;
         }
-        add_filtered_rows(filtered_rows);
+        _add_filtered_rows(filtered_rows);
 
         if (new_row_block->row_block_info().row_num > 0) {
             if (!row_block_sorter.sort(&new_row_block)) {
@@ -1260,35 +1204,18 @@ Status SchemaChangeWithSorting::process(RowsetReaderSharedPtr rowset_reader,
     }
 
     if (src_rowsets.empty()) {
-        res = new_rowset_writer->flush();
-        if (!res.ok()) {
+        res = rowset_writer->flush();
+        if (!res) {
             LOG(WARNING) << "create empty version for schema change failed."
-                         << " version=" << new_rowset_writer->version().first << "-"
-                         << new_rowset_writer->version().second;
+                         << " version=" << rowset_writer->version().first << "-"
+                         << rowset_writer->version().second;
             return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
         }
-    } else if (!_external_sorting(src_rowsets, new_rowset_writer, new_tablet)) {
+    } else if (!_external_sorting(src_rowsets, rowset_writer, new_tablet)) {
         LOG(WARNING) << "failed to sorting externally.";
         return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
     }
 
-    add_filtered_rows(rowset_reader->filtered_rows());
-
-    // Check row num changes
-    if (config::row_nums_check) {
-        if (rowset_reader->rowset()->num_rows() !=
-            new_rowset_writer->num_rows() + merged_rows() + filtered_rows()) {
-            LOG(WARNING) << "fail to check row num!"
-                         << " source_rows=" << rowset_reader->rowset()->num_rows()
-                         << ", merged_rows=" << merged_rows()
-                         << ", filtered_rows=" << filtered_rows()
-                         << ", new_index_rows=" << new_rowset_writer->num_rows();
-            res = Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
-        }
-    }
-    LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
-              << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
-              << ", new_index_rows=" << new_rowset_writer->num_rows();
     return res;
 }
 
@@ -1315,7 +1242,7 @@ bool SchemaChangeWithSorting::_internal_sorting(const std::vector<RowBlock*>& ro
     }
     new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                rowset_writer->rowset_id().to_string());
-    add_merged_rows(merged_rows);
+    _add_merged_rows(merged_rows);
     *rowset = rowset_writer->build();
     return true;
 }
@@ -1327,31 +1254,27 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
     for (auto& rowset : src_rowsets) {
         RowsetReaderSharedPtr rs_reader;
         auto res = rowset->create_reader(&rs_reader);
-        if (!res.ok()) {
+        if (!res) {
             LOG(WARNING) << "failed to create rowset reader.";
             return false;
         }
-        rs_readers.push_back(std::move(rs_reader));
+        rs_readers.push_back(rs_reader);
     }
 
     Merger::Statistics stats;
     auto res = Merger::merge_rowsets(new_tablet, READER_ALTER_TABLE, rs_readers, rowset_writer,
                                      &stats);
-    if (!res.ok()) {
+    if (!res) {
         LOG(WARNING) << "failed to merge rowsets. tablet=" << new_tablet->full_name()
                      << ", version=" << rowset_writer->version().first << "-"
                      << rowset_writer->version().second;
         return false;
     }
-    add_merged_rows(stats.merged_rows);
-    add_filtered_rows(stats.filtered_rows);
+    _add_merged_rows(stats.merged_rows);
+    _add_filtered_rows(stats.filtered_rows);
     return true;
 }
 
-SchemaChangeHandler::SchemaChangeHandler() {}
-
-SchemaChangeHandler::~SchemaChangeHandler() {}
-
 Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
     LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
               << ", new_tablet_id=" << request.new_tablet_id
@@ -1377,6 +1300,9 @@ Status SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& req
     return res;
 }
 
+std::shared_mutex SchemaChangeHandler::_mutex;
+std::unordered_set<int64_t> SchemaChangeHandler::_tablet_ids_in_converting;
+
 // In the past schema change and rollup will create new tablet  and will wait for txns starting before the task to finished
 // It will cost a lot of time to wait and the task is very difficult to understand.
 // In alter task v2, FE will call BE to create tablet and send an alter task to BE to convert historical data.
@@ -1457,12 +1383,13 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
         reader_context.seek_columns = &return_columns;
         reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
         reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
+        reader_context.batch_size = ALTER_TABLE_BATCH_SIZE;
+        reader_context.is_vec = config::enable_vectorized_alter_table;
 
         do {
             RowsetSharedPtr max_rowset;
             // get history data to be converted and it will check if there is hold in base tablet
-            res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset);
-            if (!res.ok()) {
+            if (!_get_versions_to_be_changed(base_tablet, &versions_to_be_changed, &max_rowset)) {
                 LOG(WARNING) << "fail to get version to be changed. res=" << res;
                 break;
             }
@@ -1514,27 +1441,14 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
 
             // init one delete handler
-            int32_t end_version = -1;
+            int64_t end_version = -1;
             for (auto& version : versions_to_be_changed) {
-                if (version.second > end_version) {
-                    end_version = version.second;
-                }
-            }
-
-            res = delete_handler.init(base_tablet->tablet_schema(),
-                                      base_tablet->delete_predicates(), end_version);
-            if (!res.ok()) {
-                LOG(WARNING) << "init delete handler failed. base_tablet="
-                             << base_tablet->full_name() << ", end_version=" << end_version;
-
-                // release delete handlers which have been inited successfully.
-                delete_handler.finalize();
-                break;
+                end_version = std::max(end_version, version.second);
             }
 
             // acquire data sources correspond to history versions
             base_tablet->capture_rs_readers(versions_to_be_changed, &rs_readers);
-            if (rs_readers.size() < 1) {
+            if (rs_readers.empty()) {
                 LOG(WARNING) << "fail to acquire all data sources. "
                              << "version_num=" << versions_to_be_changed.size()
                              << ", data_source_num=" << rs_readers.size();
@@ -1542,22 +1456,47 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                 break;
             }
 
+            vectorized::BlockReader reader;
+            TabletReader::ReaderParams reader_params;
+            reader_params.tablet = base_tablet;
+            reader_params.reader_type = READER_ALTER_TABLE;
+            reader_params.rs_readers = rs_readers;
+            const auto& schema = base_tablet->tablet_schema();
+            reader_params.return_columns.resize(schema.num_columns());
+            std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
+            reader_params.origin_return_columns = &reader_params.return_columns;
+            reader_params.version = {0, end_version};
+            // BlockReader::init will call base_tablet->get_header_lock(), but this lock we already get at outer layer, so we just call TabletReader::init
+            RETURN_NOT_OK(reader.TabletReader::init(reader_params));
+
+            res = delete_handler.init(base_tablet->tablet_schema(),
+                                      base_tablet->delete_predicates(), end_version, &reader);
+            if (!res) {
+                LOG(WARNING) << "init delete handler failed. base_tablet="
+                             << base_tablet->full_name() << ", end_version=" << end_version;
+
+                // release delete handlers which have been inited successfully.
+                delete_handler.finalize();
+                break;
+            }
+
             for (auto& rs_reader : rs_readers) {
                 res = rs_reader->init(&reader_context);
-                if (!res.ok()) {
+                if (!res) {
                     LOG(WARNING) << "failed to init rowset reader: " << base_tablet->full_name();
                     break;
                 }
             }
-
-        } while (0);
+        } while (false);
     }
 
     do {
-        if (!res.ok()) {
+        if (!res) {
             break;
         }
         SchemaChangeParams sc_params;
+
+        DescriptorTbl::create(&sc_params.pool, request.desc_tbl, &sc_params.desc_tbl);
         sc_params.base_tablet = base_tablet;
         sc_params.new_tablet = new_tablet;
         sc_params.ref_rowset_readers = rs_readers;
@@ -1588,6 +1527,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
                     } else if (item.mv_expr.nodes[0].node_type == TExprNodeType::CASE_EXPR) {
                         mv_param.mv_expr = "count_field";
                     }
+                    mv_param.expr = std::make_shared<TExpr>(item.mv_expr);
                 }
                 sc_params.materialized_params_map.insert(
                         std::make_pair(item.column_name, mv_param));
@@ -1602,26 +1542,26 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             std::lock_guard<std::shared_mutex> wrlock(_mutex);
             _tablet_ids_in_converting.erase(new_tablet->tablet_id());
         }
-        if (!res.ok()) {
+        if (!res) {
             break;
         }
         // set state to ready
         std::lock_guard<std::shared_mutex> new_wlock(new_tablet->get_header_lock());
         res = new_tablet->set_tablet_state(TabletState::TABLET_RUNNING);
-        if (!res.ok()) {
+        if (!res) {
             break;
         }
         new_tablet->save_meta();
-    } while (0);
+    } while (false);
 
-    if (res.ok()) {
+    if (res) {
         // _validate_alter_result should be outside the above while loop.
         // to avoid requiring the header lock twice.
         res = _validate_alter_result(new_tablet, request);
     }
 
     // if failed convert history data, then just remove the new tablet
-    if (!res.ok()) {
+    if (!res) {
         LOG(WARNING) << "failed to alter tablet. base_tablet=" << base_tablet->full_name()
                      << ", drop new_tablet=" << new_tablet->full_name();
         // do not drop the new tablet and its data. GC thread will
@@ -1638,7 +1578,8 @@ bool SchemaChangeHandler::tablet_in_converting(int64_t tablet_id) {
 Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
                                                    TabletSharedPtr new_tablet,
                                                    RowsetSharedPtr* base_rowset,
-                                                   RowsetSharedPtr* new_rowset) {
+                                                   RowsetSharedPtr* new_rowset,
+                                                   DescriptorTbl desc_tbl) {
     Status res = Status::OK();
     LOG(INFO) << "begin to convert delta version for schema changing. "
               << "base_tablet=" << base_tablet->full_name()
@@ -1646,13 +1587,14 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
 
     // a. Parse the Alter request and convert it into an internal representation
     // Do not use the delete condition specified by the DELETE_DATA command
-    RowBlockChanger rb_changer(new_tablet->tablet_schema());
+    RowBlockChanger rb_changer(new_tablet->tablet_schema(), desc_tbl);
     bool sc_sorting = false;
     bool sc_directly = false;
 
     const std::unordered_map<std::string, AlterMaterializedViewParam> materialized_function_map;
-    if (!(res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
-                               materialized_function_map))) {
+    if (res = _parse_request(base_tablet, new_tablet, &rb_changer, &sc_sorting, &sc_directly,
+                             materialized_function_map, desc_tbl);
+        !res) {
         LOG(WARNING) << "failed to parse the request. res=" << res;
         return res;
     }
@@ -1660,24 +1602,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
     // NOTE split_table if row_block is used, the original block will become smaller
     // But since the historical data will become normal after the subsequent base/cumulative, it is also possible to use directly
     // b. Generate historical data converter
-    SchemaChange* sc_procedure = nullptr;
-    if (sc_sorting) {
-        LOG(INFO) << "doing schema change with sorting for base_tablet "
-                  << base_tablet->full_name();
-        sc_procedure = new (nothrow) SchemaChangeWithSorting(
-                rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
-    } else if (sc_directly) {
-        LOG(INFO) << "doing schema change directly for base_tablet " << base_tablet->full_name();
-        sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
-    } else {
-        LOG(INFO) << "doing linked schema change for base_tablet " << base_tablet->full_name();
-        sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
-    }
-
-    if (sc_procedure == nullptr) {
-        LOG(FATAL) << "failed to malloc SchemaChange. size=" << sizeof(SchemaChangeWithSorting);
-        return Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
-    }
+    auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
 
     // c. Convert data
     DeleteHandler delete_handler;
@@ -1697,6 +1622,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
     reader_context.seek_columns = &return_columns;
     reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
     reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
+    reader_context.is_vec = config::enable_vectorized_alter_table;
 
     RowsetReaderSharedPtr rowset_reader;
     RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
@@ -1709,8 +1635,19 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
             (*base_rowset)->txn_id(), load_id, PREPARED,
             (*base_rowset)->rowset_meta()->segments_overlap(), &rowset_writer));
 
-    if ((res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet,
-                                     base_tablet)) != Status::OK()) {
+    auto schema_version_convert_error = [&]() -> Status {
+        if (*new_rowset != nullptr) {
+            StorageEngine::instance()->add_unused_rowset(*new_rowset);
+        }
+
+        LOG(WARNING) << "failed to convert rowsets. "
+                     << " base_tablet=" << base_tablet->full_name()
+                     << ", new_tablet=" << new_tablet->full_name() << " res = " << res;
+        return res;
+    };
+
+    if (res = sc_procedure->process(rowset_reader, rowset_writer.get(), new_tablet, base_tablet);
+        !res) {
         if ((*base_rowset)->is_pending()) {
             LOG(WARNING) << "failed to process the transaction when schema change. "
                          << "tablet=" << new_tablet->full_name() << "'"
@@ -1722,7 +1659,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
         }
         new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                    rowset_writer->rowset_id().to_string());
-        goto SCHEMA_VERSION_CONVERT_ERR;
+        return schema_version_convert_error();
     }
     *new_rowset = rowset_writer->build();
     new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
@@ -1730,25 +1667,13 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
     if (*new_rowset == nullptr) {
         LOG(WARNING) << "build rowset failed.";
         res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
-        goto SCHEMA_VERSION_CONVERT_ERR;
+        return schema_version_convert_error();
     }
 
-    SAFE_DELETE(sc_procedure);
     LOG(INFO) << "successfully convert rowsets. "
               << " base_tablet=" << base_tablet->full_name()
               << ", new_tablet=" << new_tablet->full_name();
     return res;
-
-SCHEMA_VERSION_CONVERT_ERR:
-    if (*new_rowset != nullptr) {
-        StorageEngine::instance()->add_unused_rowset(*new_rowset);
-    }
-
-    SAFE_DELETE(sc_procedure);
-    LOG(WARNING) << "failed to convert rowsets. "
-                 << " base_tablet=" << base_tablet->full_name()
-                 << ", new_tablet=" << new_tablet->full_name() << " res = " << res;
-    return res;
 }
 
 Status SchemaChangeHandler::_get_versions_to_be_changed(
@@ -1782,42 +1707,41 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
 
     // Add filter information in change, and filter column information will be set in _parse_request
     // And filter some data every time the row block changes
-    RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler);
+    RowBlockChanger rb_changer(sc_params.new_tablet->tablet_schema(), sc_params.delete_handler,
+                               *sc_params.desc_tbl);
 
     bool sc_sorting = false;
     bool sc_directly = false;
-    SchemaChange* sc_procedure = nullptr;
 
     // a.Parse the Alter request and convert it into an internal representation
-    Status res = _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer,
-                                &sc_sorting, &sc_directly, sc_params.materialized_params_map);
-    if (!res.ok()) {
+    Status res =
+            _parse_request(sc_params.base_tablet, sc_params.new_tablet, &rb_changer, &sc_sorting,
+                           &sc_directly, sc_params.materialized_params_map, *sc_params.desc_tbl);
+
+    auto process_alter_exit = [&]() -> Status {
+        {
+            // save tablet meta here because rowset meta is not saved during add rowset
+            std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
+            sc_params.new_tablet->save_meta();
+        }
+        if (res) {
+            Version test_version(0, end_version);
+            res = sc_params.new_tablet->check_version_integrity(test_version);
+        }
+
+        LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
+                  << "base_tablet=" << sc_params.base_tablet->full_name()
+                  << ", new_tablet=" << sc_params.new_tablet->full_name();
+        return res;
+    };
+
+    if (!res) {
         LOG(WARNING) << "failed to parse the request. res=" << res;
-        goto PROCESS_ALTER_EXIT;
+        return process_alter_exit();
     }
 
     // b. Generate historical data converter
-    if (sc_sorting) {
-        LOG(INFO) << "doing schema change with sorting for base_tablet "
-                  << sc_params.base_tablet->full_name();
-        sc_procedure = new (nothrow) SchemaChangeWithSorting(
-                rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
-    } else if (sc_directly) {
-        LOG(INFO) << "doing schema change directly for base_tablet "
-                  << sc_params.base_tablet->full_name();
-        sc_procedure = new (nothrow) SchemaChangeDirectly(rb_changer);
-    } else {
-        LOG(INFO) << "doing linked schema change for base_tablet "
-                  << sc_params.base_tablet->full_name();
-        sc_procedure = new (nothrow) LinkedSchemaChange(rb_changer);
-    }
-
-    if (sc_procedure == nullptr) {
-        LOG(WARNING) << "failed to malloc SchemaChange. "
-                     << "malloc_size=" << sizeof(SchemaChangeWithSorting);
-        res = Status::OLAPInternalError(OLAP_ERR_MALLOC_ERROR);
-        goto PROCESS_ALTER_EXIT;
-    }
+    auto sc_procedure = get_sc_procedure(rb_changer, sc_sorting, sc_directly);
 
     // c.Convert historical data
     for (auto& rs_reader : sc_params.ref_rowset_readers) {
@@ -1834,19 +1758,20 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
         Status status = new_tablet->create_rowset_writer(
                 rs_reader->version(), VISIBLE,
                 rs_reader->rowset()->rowset_meta()->segments_overlap(), &rowset_writer);
-        if (!status.ok()) {
+        if (!status) {
             res = Status::OLAPInternalError(OLAP_ERR_ROWSET_BUILDER_INIT);
-            goto PROCESS_ALTER_EXIT;
+            return process_alter_exit();
         }
 
-        if ((res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
-                                         sc_params.base_tablet)) != Status::OK()) {
+        if (res = sc_procedure->process(rs_reader, rowset_writer.get(), sc_params.new_tablet,
+                                        sc_params.base_tablet);
+            !res) {
             LOG(WARNING) << "failed to process the version."
                          << " version=" << rs_reader->version().first << "-"
                          << rs_reader->version().second;
             new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                        rowset_writer->rowset_id().to_string());
-            goto PROCESS_ALTER_EXIT;
+            return process_alter_exit();
         }
         new_tablet->data_dir()->remove_pending_ids(ROWSET_ID_PREFIX +
                                                    rowset_writer->rowset_id().to_string());
@@ -1856,7 +1781,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
         RowsetSharedPtr new_rowset = rowset_writer->build();
         if (new_rowset == nullptr) {
             LOG(WARNING) << "failed to build rowset, exit alter process";
-            goto PROCESS_ALTER_EXIT;
+            return process_alter_exit();
         }
         res = sc_params.new_tablet->add_rowset(new_rowset);
         if (res.precise_code() == OLAP_ERR_PUSH_VERSION_ALREADY_EXIST) {
@@ -1865,13 +1790,13 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
                          << rs_reader->version().first << "-" << rs_reader->version().second;
             StorageEngine::instance()->add_unused_rowset(new_rowset);
             res = Status::OK();
-        } else if (!res.ok()) {
+        } else if (!res) {
             LOG(WARNING) << "failed to register new version. "
                          << " tablet=" << sc_params.new_tablet->full_name()
                          << ", version=" << rs_reader->version().first << "-"
                          << rs_reader->version().second;
             StorageEngine::instance()->add_unused_rowset(new_rowset);
-            goto PROCESS_ALTER_EXIT;
+            return process_alter_exit();
         } else {
             VLOG_NOTICE << "register new version. tablet=" << sc_params.new_tablet->full_name()
                         << ", version=" << rs_reader->version().first << "-"
@@ -1882,22 +1807,9 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
                    << " version=" << rs_reader->version().first << "-"
                    << rs_reader->version().second;
     }
-    // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
-PROCESS_ALTER_EXIT : {
-    // save tablet meta here because rowset meta is not saved during add rowset
-    std::lock_guard<std::shared_mutex> new_wlock(sc_params.new_tablet->get_header_lock());
-    sc_params.new_tablet->save_meta();
-}
-    if (res.ok()) {
-        Version test_version(0, end_version);
-        res = sc_params.new_tablet->check_version_integrity(test_version);
-    }
-    SAFE_DELETE(sc_procedure);
 
-    LOG(INFO) << "finish converting rowsets for new_tablet from base_tablet. "
-              << "base_tablet=" << sc_params.base_tablet->full_name()
-              << ", new_tablet=" << sc_params.new_tablet->full_name();
-    return res;
+    // XXX:The SchemaChange state should not be canceled at this time, because the new Delta has to be converted to the old and new Schema version
+    return process_alter_exit();
 }
 
 // @static
@@ -1906,9 +1818,8 @@ Status SchemaChangeHandler::_parse_request(
         TabletSharedPtr base_tablet, TabletSharedPtr new_tablet, RowBlockChanger* rb_changer,
         bool* sc_sorting, bool* sc_directly,
         const std::unordered_map<std::string, AlterMaterializedViewParam>&
-                materialized_function_map) {
-    Status res = Status::OK();
-
+                materialized_function_map,
+        DescriptorTbl desc_tbl) {
     // set column mapping
     for (int i = 0, new_schema_size = new_tablet->tablet_schema().num_columns();
          i < new_schema_size; ++i) {
@@ -1933,11 +1844,10 @@ Status SchemaChangeHandler::_parse_request(
         }
 
         if (materialized_function_map.find(column_name) != materialized_function_map.end()) {
-            AlterMaterializedViewParam mvParam =
-                    materialized_function_map.find(column_name)->second;
+            auto mvParam = materialized_function_map.find(column_name)->second;
             column_mapping->materialized_function = mvParam.mv_expr;
-            std::string origin_column_name = mvParam.origin_column_name;
-            int32_t column_index = base_tablet->field_index(origin_column_name);
+            column_mapping->expr = mvParam.expr;
+            int32_t column_index = base_tablet->field_index(mvParam.origin_column_name);
             if (column_index >= 0) {
                 column_mapping->ref_column = column_index;
                 continue;
@@ -1961,10 +1871,8 @@ Status SchemaChangeHandler::_parse_request(
         if (i < base_tablet->num_short_key_columns()) {
             *sc_directly = true;
         }
-        res = _init_column_mapping(column_mapping, new_column, new_column.default_value());
-        if (!res) {
-            return res;
-        }
+        RETURN_IF_ERROR(
+                _init_column_mapping(column_mapping, new_column, new_column.default_value()));
 
         VLOG_TRACE << "A column with default value will be added after schema changing. "
                    << "column=" << column_name << ", default_value=" << new_column.default_value();
@@ -2000,7 +1908,7 @@ Status SchemaChangeHandler::_parse_request(
 
     // If the sort of key has not been changed but the new keys num is less then base's,
     // the new table should be re agg.
-    // So we also need to set  sc_sorting = true.
+    // So we also need to set sc_sorting = true.
     // A, B, C are keys(sort keys), D is value
     // followings need resort:
     //      old keys:    A   B   C   D
@@ -2025,23 +1933,12 @@ Status SchemaChangeHandler::_parse_request(
         if (column_mapping->ref_column < 0) {
             continue;
         } else {
-            if (new_tablet_schema.column(i).type() !=
-                ref_tablet_schema.column(column_mapping->ref_column).type()) {
-                *sc_directly = true;
-                return Status::OK();
-            } else if ((new_tablet_schema.column(i).type() ==
-                        ref_tablet_schema.column(column_mapping->ref_column).type()) &&
-                       (new_tablet_schema.column(i).length() !=
-                        ref_tablet_schema.column(column_mapping->ref_column).length())) {
-                *sc_directly = true;
-                return Status::OK();
-
-            } else if (new_tablet_schema.column(i).is_bf_column() !=
-                       ref_tablet_schema.column(column_mapping->ref_column).is_bf_column()) {
-                *sc_directly = true;
-                return Status::OK();
-            } else if (new_tablet_schema.column(i).has_bitmap_index() !=
-                       ref_tablet_schema.column(column_mapping->ref_column).has_bitmap_index()) {
+            auto column_new = new_tablet_schema.column(i);
+            auto column_old = ref_tablet_schema.column(column_mapping->ref_column);
+            if (column_new.type() != column_old.type() ||
+                column_new.length() != column_old.length() ||
+                column_new.is_bf_column() != column_old.is_bf_column() ||
+                column_new.has_bitmap_index() != column_old.has_bitmap_index()) {
                 *sc_directly = true;
                 return Status::OK();
             }
@@ -2049,7 +1946,7 @@ Status SchemaChangeHandler::_parse_request(
     }
 
     if (base_tablet->delete_predicates().size() != 0) {
-        //there exists delete condition in header, can't do linked schema change
+        // there exists delete condition in header, can't do linked schema change
         *sc_directly = true;
     }
 
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 5c8757613d..2f820ae79e 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -17,29 +17,17 @@
 
 #pragma once
 
-#include <deque>
-#include <functional>
-#include <queue>
-#include <utility>
-#include <vector>
-
+#include "common/status.h"
 #include "gen_cpp/AgentService_types.h"
 #include "olap/column_mapping.h"
 #include "olap/delete_handler.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/tablet.h"
+#include "vec/columns/column.h"
+#include "vec/core/block.h"
 
 namespace doris {
-// defined in 'field.h'
-class Field;
-class FieldInfo;
-// defined in 'tablet.h'
-class Tablet;
-// defined in 'row_block.h'
-class RowBlock;
-// defined in 'row_cursor.h'
-class RowCursor;
 
 bool to_bitmap(RowCursor* read_helper, RowCursor* write_helper, const TabletColumn& ref_column,
                int field_idx, int ref_field_idx, MemPool* mem_pool);
@@ -50,11 +38,12 @@ bool count_field(RowCursor* read_helper, RowCursor* write_helper, const TabletCo
 
 class RowBlockChanger {
 public:
-    RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler);
+    RowBlockChanger(const TabletSchema& tablet_schema, const DeleteHandler* delete_handler,
+                    DescriptorTbl desc_tbl);
 
-    RowBlockChanger(const TabletSchema& tablet_schema);
+    RowBlockChanger(const TabletSchema& tablet_schema, DescriptorTbl desc_tbl);
 
-    virtual ~RowBlockChanger();
+    ~RowBlockChanger();
 
     ColumnMapping* get_mutable_column_mapping(size_t column_index);
 
@@ -70,6 +59,8 @@ private:
     // delete handler for filtering data which use specified in DELETE_DATA
     const DeleteHandler* _delete_handler = nullptr;
 
+    DescriptorTbl _desc_tbl;
+
     DISALLOW_COPY_AND_ASSIGN(RowBlockChanger);
 };
 
@@ -94,20 +85,60 @@ public:
     SchemaChange() : _filtered_rows(0), _merged_rows(0) {}
     virtual ~SchemaChange() = default;
 
-    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
-                           TabletSharedPtr tablet, TabletSharedPtr base_tablet) = 0;
-
-    void add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
-
-    void add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
+    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                           TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
+        if (rowset_reader->rowset()->empty() || rowset_reader->rowset()->num_rows() == 0) {
+            RETURN_WITH_WARN_IF_ERROR(
+                    rowset_writer->flush(),
+                    Status::OLAPInternalError(OLAP_ERR_INPUT_PARAMETER_ERROR),
+                    fmt::format("create empty version for schema change failed. version= {}-{}",
+                                rowset_writer->version().first, rowset_writer->version().second));
+
+            return Status::OK();
+        }
+
+        _filtered_rows = 0;
+        _merged_rows = 0;
+
+        RETURN_IF_ERROR(_inner_process(rowset_reader, rowset_writer, new_tablet, base_tablet));
+        _add_filtered_rows(rowset_reader->filtered_rows());
+
+        // Check row num changes
+        if (config::row_nums_check && !_check_row_nums(rowset_reader, *rowset_writer)) {
+            return Status::OLAPInternalError(OLAP_ERR_ALTER_STATUS_ERR);
+        }
+
+        LOG(INFO) << "all row nums. source_rows=" << rowset_reader->rowset()->num_rows()
+                  << ", merged_rows=" << merged_rows() << ", filtered_rows=" << filtered_rows()
+                  << ", new_index_rows=" << rowset_writer->num_rows();
+        return Status::OK();
+    }
 
     uint64_t filtered_rows() const { return _filtered_rows; }
 
     uint64_t merged_rows() const { return _merged_rows; }
 
-    void reset_filtered_rows() { _filtered_rows = 0; }
+protected:
+    void _add_filtered_rows(uint64_t filtered_rows) { _filtered_rows += filtered_rows; }
 
-    void reset_merged_rows() { _merged_rows = 0; }
+    void _add_merged_rows(uint64_t merged_rows) { _merged_rows += merged_rows; }
+
+    virtual Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                                  TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) {
+        return Status::NotSupported("inner process unsupported.");
+    };
+
+    bool _check_row_nums(RowsetReaderSharedPtr reader, const RowsetWriter& writer) const {
+        if (reader->rowset()->num_rows() != writer.num_rows() + _merged_rows + _filtered_rows) {
+            LOG(WARNING) << "fail to check row num! "
+                         << "source_rows=" << reader->rowset()->num_rows()
+                         << ", merged_rows=" << merged_rows()
+                         << ", filtered_rows=" << filtered_rows()
+                         << ", new_index_rows=" << writer.num_rows();
+            return false;
+        }
+        return true;
+    }
 
 private:
     uint64_t _filtered_rows;
@@ -117,11 +148,11 @@ private:
 class LinkedSchemaChange : public SchemaChange {
 public:
     explicit LinkedSchemaChange(const RowBlockChanger& row_block_changer)
-            : SchemaChange(), _row_block_changer(row_block_changer) {}
-    ~LinkedSchemaChange() {}
+            : _row_block_changer(row_block_changer) {}
+    ~LinkedSchemaChange() override = default;
 
-    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
-                           TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+    Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                   TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
 
 private:
     const RowBlockChanger& _row_block_changer;
@@ -134,12 +165,12 @@ public:
     // @params tablet           the instance of tablet which has new schema.
     // @params row_block_changer    changer to modify the data of RowBlock
     explicit SchemaChangeDirectly(const RowBlockChanger& row_block_changer);
-    virtual ~SchemaChangeDirectly();
-
-    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_writer,
-                           TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+    ~SchemaChangeDirectly() override;
 
 private:
+    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                          TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
     const RowBlockChanger& _row_block_changer;
     RowBlockAllocator* _row_block_allocator;
     RowCursor* _cursor;
@@ -154,12 +185,12 @@ class SchemaChangeWithSorting : public SchemaChange {
 public:
     explicit SchemaChangeWithSorting(const RowBlockChanger& row_block_changer,
                                      size_t memory_limitation);
-    virtual ~SchemaChangeWithSorting();
-
-    virtual Status process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* new_rowset_builder,
-                           TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+    ~SchemaChangeWithSorting() override;
 
 private:
+    Status _inner_process(RowsetReaderSharedPtr rowset_reader, RowsetWriter* rowset_writer,
+                          TabletSharedPtr new_tablet, TabletSharedPtr base_tablet) override;
+
     bool _internal_sorting(const std::vector<RowBlock*>& row_block_arr,
                            const Version& temp_delta_versions, TabletSharedPtr new_tablet,
                            SegmentsOverlapPB segments_overlap, RowsetSharedPtr* rowset);
@@ -177,18 +208,26 @@ private:
 
 class SchemaChangeHandler {
 public:
-    static SchemaChangeHandler* instance() {
-        static SchemaChangeHandler instance;
-        return &instance;
-    }
-
-    Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
-                                  RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset);
+    static Status schema_version_convert(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
+                                         RowsetSharedPtr* base_rowset, RowsetSharedPtr* new_rowset,
+                                         DescriptorTbl desc_tbl);
 
     // schema change v2, it will not set alter task in base tablet
-    Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
+    static Status process_alter_tablet_v2(const TAlterTabletReqV2& request);
+
+    static std::unique_ptr<SchemaChange> get_sc_procedure(const RowBlockChanger& rb_changer,
+                                                          bool sc_sorting, bool sc_directly) {
+        if (sc_sorting) {
+            return std::make_unique<SchemaChangeWithSorting>(
+                    rb_changer, config::memory_limitation_per_thread_for_schema_change_bytes);
+        }
+        if (sc_directly) {
+            return std::make_unique<SchemaChangeDirectly>(rb_changer);
+        }
+        return std::make_unique<LinkedSchemaChange>(rb_changer);
+    }
 
-    bool tablet_in_converting(int64_t tablet_id);
+    static bool tablet_in_converting(int64_t tablet_id);
 
 private:
     // Check the status of schema change and clear information between "a pair" of Schema change tables
@@ -196,17 +235,18 @@ private:
     // Returns:
     //  Success: If there is historical information, then clear it if there is no problem; or no historical information
     //  Failure: otherwise, if there is history information and it cannot be emptied (version has not been completed)
-    Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
-                                               const TAlterTabletReq& request);
+    static Status _check_and_clear_schema_change_info(TabletSharedPtr tablet,
+                                                      const TAlterTabletReq& request);
 
-    Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
-                                       std::vector<Version>* versions_to_be_changed,
-                                       RowsetSharedPtr* max_rowset);
+    static Status _get_versions_to_be_changed(TabletSharedPtr base_tablet,
+                                              std::vector<Version>* versions_to_be_changed,
+                                              RowsetSharedPtr* max_rowset);
 
     struct AlterMaterializedViewParam {
         std::string column_name;
         std::string origin_column_name;
         std::string mv_expr;
+        std::shared_ptr<TExpr> expr;
     };
 
     struct SchemaChangeParams {
@@ -216,31 +256,29 @@ private:
         std::vector<RowsetReaderSharedPtr> ref_rowset_readers;
         DeleteHandler* delete_handler = nullptr;
         std::unordered_map<std::string, AlterMaterializedViewParam> materialized_params_map;
+        DescriptorTbl* desc_tbl = nullptr;
+        ObjectPool pool;
     };
 
-    Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
+    static Status _do_process_alter_tablet_v2(const TAlterTabletReqV2& request);
 
-    Status _validate_alter_result(TabletSharedPtr new_tablet, const TAlterTabletReqV2& request);
+    static Status _validate_alter_result(TabletSharedPtr new_tablet,
+                                         const TAlterTabletReqV2& request);
 
-    Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
+    static Status _convert_historical_rowsets(const SchemaChangeParams& sc_params);
 
     static Status _parse_request(TabletSharedPtr base_tablet, TabletSharedPtr new_tablet,
                                  RowBlockChanger* rb_changer, bool* sc_sorting, bool* sc_directly,
                                  const std::unordered_map<std::string, AlterMaterializedViewParam>&
-                                         materialized_function_map);
+                                         materialized_function_map,
+                                 DescriptorTbl desc_tbl);
 
     // Initialization Settings for creating a default value
     static Status _init_column_mapping(ColumnMapping* column_mapping,
                                        const TabletColumn& column_schema, const std::string& value);
 
-private:
-    SchemaChangeHandler();
-    virtual ~SchemaChangeHandler();
-    SchemaChangeHandler(const SchemaChangeHandler&) = delete;
-    SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
-
-    std::shared_mutex _mutex;
-    std::unordered_set<int64_t> _tablet_ids_in_converting;
+    static std::shared_mutex _mutex;
+    static std::unordered_set<int64_t> _tablet_ids_in_converting;
 };
 
 using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 85d8f7e4ad..771069e4ce 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -732,8 +732,7 @@ bool Tablet::can_do_compaction(size_t path_hash, CompactionType compaction_type)
         // Before doing schema change, tablet's rowsets that versions smaller than max converting version will be
         // removed. So, we only need to do the compaction when it is being converted.
         // After being converted, tablet's state will be changed to TABLET_RUNNING.
-        auto schema_change_handler = SchemaChangeHandler::instance();
-        return schema_change_handler->tablet_in_converting(tablet_id());
+        return SchemaChangeHandler::tablet_in_converting(tablet_id());
     }
 
     return true;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp
index 24496822d3..55ba6ab6e9 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -23,8 +23,6 @@
 
 namespace doris {
 
-using std::to_string;
-
 EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
         : _alter_tablet_req(request) {
     _mem_tracker = MemTracker::create_tracker(
@@ -39,8 +37,7 @@ Status EngineAlterTabletTask::execute() {
     SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     DorisMetrics::instance()->create_rollup_requests_total->increment(1);
 
-    auto schema_change_handler = SchemaChangeHandler::instance();
-    Status res = schema_change_handler->process_alter_tablet_v2(_alter_tablet_req);
+    Status res = SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
 
     if (!res.ok()) {
         LOG(WARNING) << "failed to do alter task. res=" << res
@@ -53,8 +50,8 @@ Status EngineAlterTabletTask::execute() {
     }
 
     LOG(INFO) << "success to create new alter tablet. res=" << res
-              << " base_tablet_id=" << _alter_tablet_req.base_tablet_id << ", base_schema_hash"
-              << _alter_tablet_req.base_schema_hash
+              << " base_tablet_id=" << _alter_tablet_req.base_tablet_id
+              << ", base_schema_hash=" << _alter_tablet_req.base_schema_hash
               << ", new_tablet_id=" << _alter_tablet_req.new_tablet_id
               << ", new_schema_hash=" << _alter_tablet_req.new_schema_hash;
     return res;
diff --git a/be/src/olap/task/engine_alter_tablet_task.h b/be/src/olap/task/engine_alter_tablet_task.h
index 1a2c0b3efa..7cc97395f1 100644
--- a/be/src/olap/task/engine_alter_tablet_task.h
+++ b/be/src/olap/task/engine_alter_tablet_task.h
@@ -15,8 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
-#define DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
+#pragma once
 
 #include "gen_cpp/AgentService_types.h"
 #include "olap/olap_define.h"
@@ -28,11 +27,11 @@ namespace doris {
 // add "Engine" as task prefix to prevent duplicate name with agent task
 class EngineAlterTabletTask : public EngineTask {
 public:
-    virtual Status execute();
+    Status execute() override;
 
 public:
     EngineAlterTabletTask(const TAlterTabletReqV2& alter_tablet_request);
-    ~EngineAlterTabletTask() {}
+    ~EngineAlterTabletTask() = default;
 
 private:
     const TAlterTabletReqV2& _alter_tablet_req;
@@ -41,4 +40,3 @@ private:
 }; // EngineTask
 
 } // namespace doris
-#endif //DORIS_BE_SRC_OLAP_TASK_ENGINE_ALTER_TABLET_TASK_H
\ No newline at end of file
diff --git a/be/src/olap/task/engine_storage_migration_task_v2.cpp b/be/src/olap/task/engine_storage_migration_task_v2.cpp
index fe00536662..118213b657 100644
--- a/be/src/olap/task/engine_storage_migration_task_v2.cpp
+++ b/be/src/olap/task/engine_storage_migration_task_v2.cpp
@@ -50,8 +50,8 @@ Status EngineStorageMigrationTaskV2::execute() {
     }
 
     LOG(INFO) << "success to create new storage migration v2. res=" << res
-              << " base_tablet_id=" << _storage_migration_req.base_tablet_id << ", base_schema_hash"
-              << _storage_migration_req.base_schema_hash
+              << " base_tablet_id=" << _storage_migration_req.base_tablet_id
+              << ", base_schema_hash=" << _storage_migration_req.base_schema_hash
               << ", new_tablet_id=" << _storage_migration_req.new_tablet_id
               << ", new_schema_hash=" << _storage_migration_req.new_schema_hash;
     return res;
diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp
index c7a9a2188c..b7a50b6808 100644
--- a/be/src/olap/tuple_reader.cpp
+++ b/be/src/olap/tuple_reader.cpp
@@ -23,14 +23,10 @@
 #include <unordered_set>
 
 #include "olap/collect_iterator.h"
+#include "olap/olap_common.h"
 #include "olap/row.h"
-#include "olap/row_block.h"
 #include "olap/row_cursor.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/schema.h"
-#include "olap/storage_engine.h"
 #include "runtime/mem_pool.h"
-#include "util/date_func.h"
 
 using std::nothrow;
 using std::set;
diff --git a/be/src/olap/tuple_reader.h b/be/src/olap/tuple_reader.h
index 844135a327..7045393e1b 100644
--- a/be/src/olap/tuple_reader.h
+++ b/be/src/olap/tuple_reader.h
@@ -20,26 +20,11 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
-#include <list>
-#include <memory>
-#include <queue>
-#include <sstream>
-#include <stack>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "exprs/bloomfilter_predicate.h"
 #include "olap/collect_iterator.h"
-#include "olap/column_predicate.h"
 #include "olap/delete_handler.h"
-#include "olap/olap_cond.h"
-#include "olap/olap_define.h"
 #include "olap/reader.h"
 #include "olap/row_cursor.h"
 #include "olap/rowset/rowset_reader.h"
-#include "olap/tablet.h"
-#include "util/runtime_profile.h"
 
 namespace doris {
 
diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp
index aa762048b7..4b81fc2cfe 100644
--- a/be/src/runtime/descriptors.cpp
+++ b/be/src/runtime/descriptors.cpp
@@ -319,7 +319,9 @@ std::string TupleDescriptor::debug_string() const {
 RowDescriptor::RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
                              const std::vector<bool>& nullable_tuples)
         : _tuple_idx_nullable_map(nullable_tuples) {
-    DCHECK(nullable_tuples.size() == row_tuples.size());
+    DCHECK(nullable_tuples.size() == row_tuples.size())
+            << "nullable_tuples size " << nullable_tuples.size() << " != row_tuples size "
+            << row_tuples.size();
     DCHECK_GT(row_tuples.size(), 0);
     _num_materialized_slots = 0;
     _num_null_slots = 0;
@@ -570,6 +572,7 @@ Status DescriptorTbl::create(ObjectPool* pool, const TDescriptorTable& thrift_tb
         }
 
         (*tbl)->_tuple_desc_map[tdesc.id] = desc;
+        (*tbl)->_row_tuples.emplace_back(tdesc.id);
     }
 
     for (size_t i = 0; i < thrift_tbl.slotDescriptors.size(); ++i) {
@@ -622,16 +625,6 @@ SlotDescriptor* DescriptorTbl::get_slot_descriptor(SlotId id) const {
     }
 }
 
-// return all registered tuple descriptors
-void DescriptorTbl::get_tuple_descs(std::vector<TupleDescriptor*>* descs) const {
-    descs->clear();
-
-    for (TupleDescriptorMap::const_iterator i = _tuple_desc_map.begin(); i != _tuple_desc_map.end();
-         ++i) {
-        descs->push_back(i->second);
-    }
-}
-
 bool SlotDescriptor::layout_equals(const SlotDescriptor& other_desc) const {
     if (type().type != other_desc.type().type) return false;
     if (is_nullable() != other_desc.is_nullable()) return false;
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index a806a537c5..ee18f8a450 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -149,7 +149,7 @@ private:
 class TableDescriptor {
 public:
     TableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~TableDescriptor() {}
+    virtual ~TableDescriptor() = default;
     int num_cols() const { return _num_cols; }
     int num_clustering_cols() const { return _num_clustering_cols; }
     virtual std::string debug_string() const;
@@ -173,14 +173,14 @@ private:
 class OlapTableDescriptor : public TableDescriptor {
 public:
     OlapTableDescriptor(const TTableDescriptor& tdesc);
-    virtual std::string debug_string() const;
+    std::string debug_string() const override;
 };
 
 class SchemaTableDescriptor : public TableDescriptor {
 public:
     SchemaTableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~SchemaTableDescriptor();
-    virtual std::string debug_string() const;
+    ~SchemaTableDescriptor() override;
+    std::string debug_string() const override;
     TSchemaTableType::type schema_table_type() const { return _schema_table_type; }
 
 private:
@@ -190,8 +190,8 @@ private:
 class BrokerTableDescriptor : public TableDescriptor {
 public:
     BrokerTableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~BrokerTableDescriptor();
-    virtual std::string debug_string() const;
+    ~BrokerTableDescriptor() override;
+    std::string debug_string() const override;
 
 private:
 };
@@ -199,8 +199,8 @@ private:
 class HiveTableDescriptor : public TableDescriptor {
 public:
     HiveTableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~HiveTableDescriptor();
-    virtual std::string debug_string() const;
+    ~HiveTableDescriptor() override;
+    std::string debug_string() const override;
 
 private:
 };
@@ -208,8 +208,8 @@ private:
 class IcebergTableDescriptor : public TableDescriptor {
 public:
     IcebergTableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~IcebergTableDescriptor();
-    virtual std::string debug_string() const;
+    ~IcebergTableDescriptor() override;
+    std::string debug_string() const override;
 
 private:
 };
@@ -217,8 +217,8 @@ private:
 class EsTableDescriptor : public TableDescriptor {
 public:
     EsTableDescriptor(const TTableDescriptor& tdesc);
-    virtual ~EsTableDescriptor();
-    virtual std::string debug_string() const;
+    ~EsTableDescriptor() override;
+    std::string debug_string() const override;
 
 private:
 };
@@ -226,7 +226,7 @@ private:
 class MySQLTableDescriptor : public TableDescriptor {
 public:
     MySQLTableDescriptor(const TTableDescriptor& tdesc);
-    virtual std::string debug_string() const;
+    std::string debug_string() const override;
     const std::string mysql_db() const { return _mysql_db; }
     const std::string mysql_table() const { return _mysql_table; }
     const std::string host() const { return _host; }
@@ -248,7 +248,7 @@ private:
 class ODBCTableDescriptor : public TableDescriptor {
 public:
     ODBCTableDescriptor(const TTableDescriptor& tdesc);
-    virtual std::string debug_string() const;
+    std::string debug_string() const override;
     const std::string db() const { return _db; }
     const std::string table() const { return _table; }
     const std::string host() const { return _host; }
@@ -348,22 +348,32 @@ public:
     TableDescriptor* get_table_descriptor(TableId id) const;
     TupleDescriptor* get_tuple_descriptor(TupleId id) const;
     SlotDescriptor* get_slot_descriptor(SlotId id) const;
+    const std::vector<TTupleId>& get_row_tuples() const { return _row_tuples; }
 
     // return all registered tuple descriptors
-    void get_tuple_descs(std::vector<TupleDescriptor*>* descs) const;
+    std::vector<TupleDescriptor*> get_tuple_descs() const {
+        std::vector<TupleDescriptor*> descs;
+
+        for (auto it : _tuple_desc_map) {
+            descs.push_back(it.second);
+        }
+
+        return descs;
+    }
 
     std::string debug_string() const;
 
 private:
-    typedef std::unordered_map<TableId, TableDescriptor*> TableDescriptorMap;
-    typedef std::unordered_map<TupleId, TupleDescriptor*> TupleDescriptorMap;
-    typedef std::unordered_map<SlotId, SlotDescriptor*> SlotDescriptorMap;
+    using TableDescriptorMap = std::unordered_map<TableId, TableDescriptor*>;
+    using TupleDescriptorMap = std::unordered_map<TupleId, TupleDescriptor*>;
+    using SlotDescriptorMap = std::unordered_map<SlotId, SlotDescriptor*>;
 
     TableDescriptorMap _tbl_desc_map;
     TupleDescriptorMap _tuple_desc_map;
     SlotDescriptorMap _slot_desc_map;
+    std::vector<TTupleId> _row_tuples;
 
-    DescriptorTbl() : _tbl_desc_map(), _tuple_desc_map(), _slot_desc_map() {}
+    DescriptorTbl() = default;
 };
 
 // Records positions of tuples within row produced by ExecNode.
@@ -378,6 +388,11 @@ public:
     RowDescriptor(const DescriptorTbl& desc_tbl, const std::vector<TTupleId>& row_tuples,
                   const std::vector<bool>& nullable_tuples);
 
+    static RowDescriptor create_default(const DescriptorTbl& desc_tbl,
+                                        const std::vector<bool>& nullable_tuples) {
+        return RowDescriptor(desc_tbl, desc_tbl.get_row_tuples(), nullable_tuples);
+    }
+
     // standard copy c'tor, made explicit here
     RowDescriptor(const RowDescriptor& desc)
             : _tuple_desc_map(desc._tuple_desc_map),
@@ -399,7 +414,7 @@ public:
     RowDescriptor(const RowDescriptor& lhs_row_desc, const RowDescriptor& rhs_row_desc);
 
     // dummy descriptor, needed for the JNI EvalPredicate() function
-    RowDescriptor() {}
+    RowDescriptor() = default;
 
     // Returns total size in bytes.
     // TODO: also take avg string lengths into account, ie, change this
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 482b56d378..6bbaabd1ea 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -130,6 +130,20 @@ RuntimeState::RuntimeState(const TQueryGlobals& query_globals)
     TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
 }
 
+RuntimeState::RuntimeState()
+        : _profile("<unnamed>"),
+          _obj_pool(new ObjectPool()),
+          _data_stream_recvrs_pool(new ObjectPool()),
+          _unreported_error_idx(0),
+          _is_cancelled(false),
+          _per_fragment_instance_idx(0) {
+    _query_options.batch_size = DEFAULT_BATCH_SIZE;
+    _timezone = TimezoneUtils::default_time_zone;
+    _timestamp_ms = 0;
+    TimezoneUtils::find_cctz_time_zone(_timezone, _timezone_obj);
+    _exec_env = ExecEnv::GetInstance();
+}
+
 RuntimeState::~RuntimeState() {
     _block_mgr2.reset();
     // close error log file
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index eed32d8b82..ba07ecbf38 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -75,6 +75,9 @@ public:
     // RuntimeState for executing expr in fe-support.
     RuntimeState(const TQueryGlobals& query_globals);
 
+    // for job task only
+    RuntimeState();
+
     // Empty d'tor to avoid issues with unique_ptr.
     ~RuntimeState();
 
@@ -105,7 +108,7 @@ public:
     std::shared_ptr<ObjectPool> obj_pool_ptr() const { return _obj_pool; }
 
     const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
-    void set_desc_tbl(DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
+    void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
     int batch_size() const { return _query_options.batch_size; }
     bool abort_on_error() const { return _query_options.abort_on_error; }
     bool abort_on_default_limit_exceeded() const {
@@ -188,7 +191,7 @@ public:
     int64_t backend_id() const { return _backend_id; }
 
     void set_be_number(int be_number) { _be_number = be_number; }
-    int be_number(void) { return _be_number; }
+    int be_number(void) const { return _be_number; }
 
     // Sets _process_status with err_msg if no error has been set yet.
     void set_process_status(const std::string& err_msg) {
@@ -239,7 +242,7 @@ public:
 
     void set_load_job_id(int64_t job_id) { _load_job_id = job_id; }
 
-    const int64_t load_job_id() { return _load_job_id; }
+    const int64_t load_job_id() const { return _load_job_id; }
 
     // we only initialize object for load jobs
     void set_load_error_hub_info(const TLoadErrorHubInfo& hub_info) {
@@ -312,17 +315,21 @@ public:
 
     ReservationTracker* instance_buffer_reservation() { return _instance_buffer_reservation.get(); }
 
-    int64_t min_reservation() { return _query_options.min_reservation; }
+    int64_t min_reservation() const { return _query_options.min_reservation; }
 
-    int64_t max_reservation() { return _query_options.max_reservation; }
+    int64_t max_reservation() const { return _query_options.max_reservation; }
 
-    bool disable_stream_preaggregations() { return _query_options.disable_stream_preaggregations; }
+    bool disable_stream_preaggregations() const {
+        return _query_options.disable_stream_preaggregations;
+    }
 
     bool enable_spill() const { return _query_options.enable_spilling; }
 
-    int32_t runtime_filter_wait_time_ms() { return _query_options.runtime_filter_wait_time_ms; }
+    int32_t runtime_filter_wait_time_ms() const {
+        return _query_options.runtime_filter_wait_time_ms;
+    }
 
-    int32_t runtime_filter_max_in_num() { return _query_options.runtime_filter_max_in_num; }
+    int32_t runtime_filter_max_in_num() const { return _query_options.runtime_filter_max_in_num; }
 
     bool enable_vectorized_exec() const { return _query_options.enable_vectorized_engine; }
 
@@ -387,7 +394,7 @@ private:
     // _obj_pool. Because some of object in _obj_pool will use profile when deconstructing.
     RuntimeProfile _profile;
 
-    DescriptorTbl* _desc_tbl;
+    const DescriptorTbl* _desc_tbl;
     std::shared_ptr<ObjectPool> _obj_pool;
 
     // runtime filter
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index b476715612..e5b82fc939 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -24,8 +24,7 @@
 
 namespace doris {
 
-typedef void (*ERRCALLBACK)();
-
+using ERRCALLBACK = void (*)();
 struct ConsumeErrCallBackInfo {
     std::string cancel_msg;
     bool cancel_task; // Whether to cancel the task when the current tracker exceeds the limit.
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index ea8b952f8c..b0b6231cbb 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -166,7 +166,9 @@ public:
 
     /// Appends one element from other column with the same type multiple times.
     virtual void insert_many_from(const IColumn& src, size_t position, size_t length) {
-        for (size_t i = 0; i < length; ++i) insert_from(src, position);
+        for (size_t i = 0; i < length; ++i) {
+            insert_from(src, position);
+        }
     }
 
     /// Appends a batch elements from other column with the same type
@@ -199,6 +201,12 @@ public:
         LOG(FATAL) << "Method insert_many_binary_data is not supported for " << get_name();
     }
 
+    void insert_many_data(const char* pos, size_t length, size_t data_num) {
+        for (size_t i = 0; i < data_num; ++i) {
+            insert_data(pos, length);
+        }
+    }
+
     /// Appends "default value".
     /// Is used when there are need to increase column size, but inserting value doesn't make sense.
     /// For example, ColumnNullable(Nested) absolutely ignores values of nested column if it is marked as NULL.
@@ -206,7 +214,9 @@ public:
 
     /// Appends "default value" multiple times.
     virtual void insert_many_defaults(size_t length) {
-        for (size_t i = 0; i < length; ++i) insert_default();
+        for (size_t i = 0; i < length; ++i) {
+            insert_default();
+        }
     }
 
     virtual void insert_elements(void* elements, size_t num) {
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index aeeb84c679..f929da979d 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -17,12 +17,9 @@
 
 #include "vec/olap/block_reader.h"
 
-#include "olap/row_block.h"
-#include "olap/rowset/beta_rowset_reader.h"
-#include "olap/schema.h"
-#include "olap/storage_engine.h"
+#include "common/status.h"
+#include "olap/olap_common.h"
 #include "runtime/mem_pool.h"
-#include "runtime/mem_tracker.h"
 #include "vec/aggregate_functions/aggregate_function_reader.h"
 #include "vec/olap/vcollect_iterator.h"
 
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 91ae5f670f..ad2d27d1d8 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -19,13 +19,9 @@
 
 #include <parallel_hashmap/phmap.h>
 
-#include "olap/collect_iterator.h"
 #include "olap/reader.h"
 #include "olap/rowset/rowset_reader.h"
-#include "olap/tablet.h"
 #include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/aggregate_functions/aggregate_function_reader.h"
-#include "vec/aggregate_functions/aggregate_function_simple_factory.h"
 #include "vec/olap/vcollect_iterator.h"
 
 namespace doris {
@@ -50,9 +46,6 @@ public:
     }
 
 private:
-    friend class VCollectIterator;
-    friend class DeleteHandler;
-
     // Directly read row from rowset and pass to upper caller. No need to do aggregation.
     // This is usually used for DUPLICATE KEY tables
     Status _direct_next_block(Block* block, MemPool* mem_pool, ObjectPool* agg_pool, bool* eof);
diff --git a/be/test/olap/schema_change_test.cpp b/be/test/olap/schema_change_test.cpp
index 7bbf3533ab..019c7635f8 100644
--- a/be/test/olap/schema_change_test.cpp
+++ b/be/test/olap/schema_change_test.cpp
@@ -996,7 +996,7 @@ TEST_F(TestColumn, ConvertIntToBitmap) {
     TabletSchema mv_tablet_schema;
     mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
 
-    RowBlockChanger row_block_changer(mv_tablet_schema);
+    RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
     ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
     column_mapping->ref_column = 0;
     column_mapping = row_block_changer.get_mutable_column_mapping(1);
@@ -1079,7 +1079,7 @@ TEST_F(TestColumn, ConvertCharToHLL) {
     TabletSchema mv_tablet_schema;
     mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
 
-    RowBlockChanger row_block_changer(mv_tablet_schema);
+    RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
     ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
     column_mapping->ref_column = 0;
     column_mapping = row_block_changer.get_mutable_column_mapping(1);
@@ -1160,7 +1160,7 @@ TEST_F(TestColumn, ConvertCharToCount) {
     TabletSchema mv_tablet_schema;
     mv_tablet_schema.init_from_pb(mv_tablet_schema_pb);
 
-    RowBlockChanger row_block_changer(mv_tablet_schema);
+    RowBlockChanger row_block_changer(mv_tablet_schema, DescriptorTbl());
     ColumnMapping* column_mapping = row_block_changer.get_mutable_column_mapping(0);
     column_mapping->ref_column = 0;
     column_mapping = row_block_changer.get_mutable_column_mapping(1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
index 9a66f10772..a05863d9ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java
@@ -19,10 +19,13 @@ package org.apache.doris.alter;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CreateMaterializedViewStmt;
+import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.MVColumnItem;
+import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SqlParser;
 import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -363,14 +366,22 @@ public class RollupJobV2 extends AlterJobV2 implements GsonPostProcessable {
                         }
                     }
 
+                    List<Column> fullSchema = tbl.getBaseSchema(true);
+                    DescriptorTable descTable = new DescriptorTable();
+                    for (Column column : fullSchema) {
+                        TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+                        SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
+                        destSlotDesc.setIsMaterialized(true);
+                        destSlotDesc.setColumn(column);
+                        destSlotDesc.setIsNullable(column.isAllowNull());
+                    }
+
                     List<Replica> rollupReplicas = rollupTablet.getReplicas();
                     for (Replica rollupReplica : rollupReplicas) {
-                        AlterReplicaTask rollupTask = new AlterReplicaTask(
-                                rollupReplica.getBackendId(), dbId, tableId, partitionId,
-                                rollupIndexId, baseIndexId,
-                                rollupTabletId, baseTabletId, rollupReplica.getId(),
-                                rollupSchemaHash, baseSchemaHash,
-                                visibleVersion, jobId, JobType.ROLLUP, defineExprs);
+                        AlterReplicaTask rollupTask = new AlterReplicaTask(rollupReplica.getBackendId(), dbId, tableId,
+                                partitionId, rollupIndexId, baseIndexId, rollupTabletId, baseTabletId,
+                                rollupReplica.getId(), rollupSchemaHash, baseSchemaHash, visibleVersion, jobId,
+                                JobType.ROLLUP, defineExprs, descTable);
                         rollupBatchTask.addTask(rollupTask);
                     }
                 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
index 411ab275a5..4f38cdafda 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java
@@ -17,6 +17,11 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.analysis.DescriptorTable;
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Database;
@@ -375,13 +380,21 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
 
         tbl.readLock();
         try {
+            Map<String, Column> indexColumnMap = Maps.newHashMap();
+            for (Map.Entry<Long, List<Column>> entry : indexSchemaMap.entrySet()) {
+                for (Column column : entry.getValue()) {
+                    indexColumnMap.put(column.getName(), column);
+                }
+            }
+
             Preconditions.checkState(tbl.getState() == OlapTableState.SCHEMA_CHANGE);
 
             for (long partitionId : partitionIndexMap.rowKeySet()) {
                 Partition partition = tbl.getPartition(partitionId);
                 Preconditions.checkNotNull(partition, partitionId);
 
-                // the schema change task will transform the data before visible version(included).
+                // the schema change task will transform the data before visible
+                // version(included).
                 long visibleVersion = partition.getVisibleVersion();
 
                 Map<Long, MaterializedIndex> shadowIndexMap = partitionIndexMap.row(partitionId);
@@ -389,6 +402,32 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                     long shadowIdxId = entry.getKey();
                     MaterializedIndex shadowIdx = entry.getValue();
 
+                    Map<String, Expr> defineExprs = Maps.newHashMap();
+
+                    List<Column> fullSchema = tbl.getBaseSchema(true);
+                    DescriptorTable descTable = new DescriptorTable();
+                    for (Column column : fullSchema) {
+                        TupleDescriptor destTupleDesc = descTable.createTupleDescriptor();
+                        SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
+                        destSlotDesc.setIsMaterialized(true);
+                        destSlotDesc.setColumn(column);
+                        destSlotDesc.setIsNullable(column.isAllowNull());
+
+                        if (indexColumnMap.containsKey(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName())) {
+                            Column newColumn = indexColumnMap
+                                    .get(SchemaChangeHandler.SHADOW_NAME_PRFIX + column.getName());
+                            if (newColumn.getType() != column.getType()) {
+                                try {
+                                    defineExprs.put(column.getName(),
+                                            new SlotRef(destSlotDesc).castTo(newColumn.getType()));
+                                } catch (AnalysisException e) {
+                                    throw new AlterCancelException(e.getMessage());
+                                }
+                            }
+                        }
+
+                    }
+
                     long originIdxId = indexIdMap.get(shadowIdxId);
                     int shadowSchemaHash = indexSchemaVersionAndHashMap.get(shadowIdxId).schemaHash;
                     int originSchemaHash = tbl.getSchemaHashByIndexId(indexIdMap.get(shadowIdxId));
@@ -398,12 +437,10 @@ public class SchemaChangeJobV2 extends AlterJobV2 {
                         long originTabletId = partitionIndexTabletMap.get(partitionId, shadowIdxId).get(shadowTabletId);
                         List<Replica> shadowReplicas = shadowTablet.getReplicas();
                         for (Replica shadowReplica : shadowReplicas) {
-                            AlterReplicaTask rollupTask = new AlterReplicaTask(
-                                    shadowReplica.getBackendId(), dbId, tableId, partitionId,
-                                    shadowIdxId, originIdxId,
-                                    shadowTabletId, originTabletId, shadowReplica.getId(),
-                                    shadowSchemaHash, originSchemaHash,
-                                    visibleVersion, jobId, JobType.SCHEMA_CHANGE);
+                            AlterReplicaTask rollupTask = new AlterReplicaTask(shadowReplica.getBackendId(), dbId,
+                                    tableId, partitionId, shadowIdxId, originIdxId, shadowTabletId, originTabletId,
+                                    shadowReplica.getId(), shadowSchemaHash, originSchemaHash, visibleVersion, jobId,
+                                    JobType.SCHEMA_CHANGE, defineExprs, descTable);
                             schemaChangeBatchTask.addTask(rollupTask);
                         }
                     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index b965ec4b38..e94e3ef290 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -800,11 +800,8 @@ public class Analyzer {
         }
         result = globalState.descTbl.addSlotDescriptor(d);
         result.setColumn(col);
-        if (col.isAllowNull() || isOuterJoined(d.getId())) {
-            result.setIsNullable(true);
-        } else {
-            result.setIsNullable(false);
-        }
+        result.setIsNullable(col.isAllowNull() || isOuterJoined(d.getId()));
+
         slotRefMap.put(key, result);
         return result;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
index 7dd92af01c..579d9d1512 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java
@@ -352,11 +352,7 @@ public class InsertStmt extends DdlStmt {
                 slotDesc.setIsMaterialized(true);
                 slotDesc.setType(col.getType());
                 slotDesc.setColumn(col);
-                if (col.isAllowNull()) {
-                    slotDesc.setIsNullable(true);
-                } else {
-                    slotDesc.setIsNullable(false);
-                }
+                slotDesc.setIsNullable(col.isAllowNull());
             }
             // will use it during create load job
             indexIdToSchemaHash = olapTable.getIndexIdToSchemaHash();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
index 240edf92fc..b91c839441 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java
@@ -112,11 +112,7 @@ public class LoadingTaskPlanner {
             SlotDescriptor slotDesc = descTable.addSlotDescriptor(destTupleDesc);
             slotDesc.setIsMaterialized(true);
             slotDesc.setColumn(col);
-            if (col.isAllowNull()) {
-                slotDesc.setIsNullable(true);
-            } else {
-                slotDesc.setIsNullable(false);
-            }
+            slotDesc.setIsNullable(col.isAllowNull());
         }
 
         // Generate plan trees
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
index 37bae6bd07..cbc8f64f1f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -881,11 +881,7 @@ public class SparkLoadJob extends BulkLoadJob {
                 SlotDescriptor destSlotDesc = descTable.addSlotDescriptor(destTupleDesc);
                 destSlotDesc.setIsMaterialized(true);
                 destSlotDesc.setColumn(column);
-                if (column.isAllowNull()) {
-                    destSlotDesc.setIsNullable(true);
-                } else {
-                    destSlotDesc.setIsNullable(false);
-                }
+                destSlotDesc.setIsNullable(column.isAllowNull());
             }
             initTBrokerScanRange(descTable, destTupleDesc, columns, brokerDesc);
             initTDescriptorTable(descTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
index 501f5bcc76..887e2b33e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java
@@ -113,11 +113,7 @@ public class UpdatePlanner extends Planner {
             slotDesc.setIsMaterialized(true);
             slotDesc.setType(col.getType());
             slotDesc.setColumn(col);
-            if (col.isAllowNull()) {
-                slotDesc.setIsNullable(true);
-            } else {
-                slotDesc.setIsNullable(false);
-            }
+            slotDesc.setIsNullable(col.isAllowNull());
         }
         targetTupleDesc.computeStatAndMemLayout();
         return targetTupleDesc;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
index f25b01d2f1..4235ce0ded 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AlterReplicaTask.java
@@ -18,6 +18,7 @@
 package org.apache.doris.task;
 
 import org.apache.doris.alter.AlterJobV2;
+import org.apache.doris.analysis.DescriptorTable;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.thrift.TAlterMaterializedViewParam;
@@ -46,21 +47,16 @@ public class AlterReplicaTask extends AgentTask {
     private AlterJobV2.JobType jobType;
 
     private Map<String, Expr> defineExprs;
-
-    public AlterReplicaTask(long backendId, long dbId, long tableId,
-                            long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
-                            long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
-                            long version, long jobId, AlterJobV2.JobType jobType) {
-        this(backendId, dbId, tableId, partitionId,
-                rollupIndexId, baseIndexId, rollupTabletId,
-                baseTabletId, newReplicaId, newSchemaHash, baseSchemaHash,
-                version, jobId, jobType, null);
-    }
-
-    public AlterReplicaTask(long backendId, long dbId, long tableId,
-            long partitionId, long rollupIndexId, long baseIndexId, long rollupTabletId,
-            long baseTabletId, long newReplicaId, int newSchemaHash, int baseSchemaHash,
-            long version, long jobId, AlterJobV2.JobType jobType,  Map<String, Expr> defineExprs) {
+    private DescriptorTable descTable;
+
+    /**
+     * AlterReplicaTask constructor.
+     *
+     */
+    public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
+            long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
+            int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
+            DescriptorTable descTable) {
         super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);
 
         this.baseTabletId = baseTabletId;
@@ -74,6 +70,7 @@ public class AlterReplicaTask extends AgentTask {
 
         this.jobType = jobType;
         this.defineExprs = defineExprs;
+        this.descTable = descTable;
     }
 
     public long getBaseTabletId() {
@@ -117,6 +114,7 @@ public class AlterReplicaTask extends AgentTask {
                 req.addToMaterializedViewParams(mvParam);
             }
         }
+        req.setDescTbl(descTable.toThrift());
         return req;
     }
 }
diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
index 44e6e2ec62..7ffc14dd06 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/planner/StreamLoadScanNodeTest.java
@@ -182,11 +182,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         TStreamLoadPutRequest request = getBaseRequest();
@@ -230,11 +226,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         TStreamLoadPutRequest request = getBaseRequest();
@@ -259,11 +251,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         TStreamLoadPutRequest request = getBaseRequest();
@@ -288,11 +276,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -332,11 +316,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -379,11 +359,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -434,11 +410,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         TStreamLoadPutRequest request = getBaseRequest();
@@ -464,11 +436,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         TStreamLoadPutRequest request = getBaseRequest();
@@ -494,11 +462,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -548,11 +512,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -594,11 +554,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -646,11 +602,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -701,11 +653,7 @@ public class StreamLoadScanNodeTest {
             SlotDescriptor slot = descTbl.addSlotDescriptor(dstDesc);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -757,11 +705,7 @@ public class StreamLoadScanNodeTest {
             System.out.println(column);
             slot.setColumn(column);
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
@@ -823,11 +767,7 @@ public class StreamLoadScanNodeTest {
             slot.setColumn(column);
 
             slot.setIsMaterialized(true);
-            if (column.isAllowNull()) {
-                slot.setIsNullable(true);
-            } else {
-                slot.setIsNullable(false);
-            }
+            slot.setIsNullable(column.isAllowNull());
         }
 
         new Expectations() {
diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift
index e695d7a737..740db1fd08 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -151,6 +151,7 @@ struct TAlterTabletReqV2 {
     6: optional Types.TVersionHash alter_version_hash // Deprecated
     7: optional list<TAlterMaterializedViewParam> materialized_view_params
     8: optional TAlterTabletType alter_tablet_type = TAlterTabletType.SCHEMA_CHANGE
+    9: optional Descriptors.TDescriptorTable desc_tbl
 }
 
 struct TAlterMaterializedViewParam {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org