You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 03:12:49 UTC

[doris] branch branch-1.2-lts updated: [feature wip](multi catalog)Support iceberg schema evolution. (#15836)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 96a4125a94 [feature wip](multi catalog)Support iceberg schema evolution. (#15836)
96a4125a94 is described below

commit 96a4125a94b3505f610b58b2272e78059ab6c218
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Fri Jan 20 12:57:36 2023 +0800

    [feature wip](multi catalog)Support iceberg schema evolution. (#15836)
    
    Support iceberg schema evolution for parquet file format.
    Iceberg use unique id for each column to support schema evolution.
    To support this feature in Doris, FE side need to get the current column id for each column and send the ids to be side.
    Be read column id from parquet key_value_metadata, set the changed column name in Block to match the name in parquet file before reading data. And set the name back after reading data.
---
 be/src/vec/core/block.h                            |   3 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp |  43 +++++-
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  15 ++-
 be/src/vec/exec/format/table/iceberg_reader.cpp    | 145 ++++++++++++++++++++-
 be/src/vec/exec/format/table/iceberg_reader.h      |  28 ++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  14 +-
 be/src/vec/exec/scan/vfile_scanner.h               |   6 +-
 be/test/vec/exec/parquet/parquet_reader_test.cpp   |   4 +-
 .../doris/catalog/HiveMetaStoreClientHelper.java   |  27 ++++
 .../doris/datasource/HMSExternalCatalog.java       |  27 ++++
 .../doris/planner/external/HiveScanProvider.java   |  10 --
 .../planner/external/IcebergScanProvider.java      |  20 +--
 .../iceberg/iceberg_schema_evolution.out           |  79 +++++++++++
 .../iceberg/iceberg_schema_evolution.groovy        |  67 ++++++++++
 14 files changed, 435 insertions(+), 53 deletions(-)

diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index d9dba69407..db508f2b2f 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -102,6 +102,8 @@ public:
         std::swap(data, new_data);
     }
 
+    void initialize_index_by_name();
+
     /// References are invalidated after calling functions above.
     ColumnWithTypeAndName& get_by_position(size_t position) { return data[position]; }
     const ColumnWithTypeAndName& get_by_position(size_t position) const { return data[position]; }
@@ -373,7 +375,6 @@ public:
 
 private:
     void erase_impl(size_t position);
-    void initialize_index_by_name();
     bool is_column_data_null(const doris::TypeDescriptor& type_desc, const StringRef& data_ref,
                              const IColumn* column_with_type_and_name, int row);
     void deep_copy_slot(void* dst, MemPool* pool, const doris::TypeDescriptor& type_desc,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index a27eddce71..26ab574486 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -23,6 +23,7 @@
 #include "io/file_factory.h"
 #include "parquet_pred_cmp.h"
 #include "parquet_thrift_util.h"
+#include "rapidjson/document.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
@@ -150,21 +151,46 @@ Status ParquetReader::_open_file() {
     return Status::OK();
 }
 
+// Get iceberg col id to col name map stored in parquet metadata key values.
+// This is for iceberg schema evolution.
+std::vector<tparquet::KeyValue> ParquetReader::get_metadata_key_values() {
+    return _t_metadata->key_value_metadata;
+}
+
+Status ParquetReader::open() {
+    SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
+    RETURN_IF_ERROR(_open_file());
+    _t_metadata = &_file_metadata->to_thrift();
+    return Status::OK();
+}
+
 Status ParquetReader::init_reader(
-        const std::vector<std::string>& column_names,
+        const std::vector<std::string>& all_column_names,
+        const std::vector<std::string>& missing_column_names,
         std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
         VExprContext* vconjunct_ctx, bool filter_groups) {
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
-    RETURN_IF_ERROR(_open_file());
-    _column_names = &column_names;
-    _t_metadata = &_file_metadata->to_thrift();
     _total_groups = _t_metadata->row_groups.size();
     if (_total_groups == 0) {
         return Status::EndOfFile("Empty Parquet File");
     }
+    // all_column_names are all the columns required by user sql.
+    // missing_column_names are the columns required by user sql but not in the parquet file,
+    // e.g. table added a column after this parquet file was written.
+    _column_names = &all_column_names;
     auto schema_desc = _file_metadata->schema();
     for (int i = 0; i < schema_desc.size(); ++i) {
-        _map_column.emplace(schema_desc.get_column(i)->name, i);
+        auto name = schema_desc.get_column(i)->name;
+        // If the column in parquet file is included in all_column_names and not in missing_column_names,
+        // add it to _map_column, which means the reader should read the data of this column.
+        // Here to check against missing_column_names is to for the 'Add a column with back to the table
+        // with the same column name' case. Shouldn't read this column data in this case.
+        if (find(all_column_names.begin(), all_column_names.end(), name) !=
+                    all_column_names.end() &&
+            find(missing_column_names.begin(), missing_column_names.end(), name) ==
+                    missing_column_names.end()) {
+            _map_column.emplace(name, i);
+        }
     }
     _colname_to_value_range = colname_to_value_range;
     RETURN_IF_ERROR(_init_read_columns());
@@ -182,7 +208,12 @@ Status ParquetReader::set_fill_columns(
     std::unordered_map<std::string, uint32_t> predicate_columns;
     std::function<void(VExpr * expr)> visit_slot = [&](VExpr* expr) {
         if (VSlotRef* slot_ref = typeid_cast<VSlotRef*>(expr)) {
-            predicate_columns.emplace(slot_ref->expr_name(), slot_ref->column_id());
+            auto expr_name = slot_ref->expr_name();
+            auto iter = _table_col_to_file_col.find(expr_name);
+            if (iter != _table_col_to_file_col.end()) {
+                expr_name = iter->second;
+            }
+            predicate_columns.emplace(expr_name, slot_ref->column_id());
             if (slot_ref->column_id() == 0) {
                 _lazy_read_ctx.resize_first_column = false;
             }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 5ab9f19024..f174e695e2 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -63,13 +63,11 @@ public:
     // for test
     void set_file_reader(FileReader* file_reader) { _file_reader.reset(file_reader); }
 
-    Status init_reader(const std::vector<std::string>& column_names, bool filter_groups = true) {
-        // without predicate
-        return init_reader(column_names, nullptr, nullptr, filter_groups);
-    }
+    Status open();
 
     Status init_reader(
-            const std::vector<std::string>& column_names,
+            const std::vector<std::string>& all_column_names,
+            const std::vector<std::string>& missing_column_names,
             std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
             VExprContext* vconjunct_ctx, bool filter_groups = true);
 
@@ -100,6 +98,11 @@ public:
                     partition_columns,
             const std::unordered_map<std::string, VExprContext*>& missing_columns) override;
 
+    std::vector<tparquet::KeyValue> get_metadata_key_values();
+    void set_table_to_file_col_map(std::unordered_map<std::string, std::string>& map) {
+        _table_col_to_file_col = map;
+    }
+
 private:
     struct ParquetProfile {
         RuntimeProfile::Counter* filtered_row_groups;
@@ -161,6 +164,8 @@ private:
     bool _row_group_eof = true;
     int32_t _total_groups;                  // num of groups(stripes) of a parquet(orc) file
     std::map<std::string, int> _map_column; // column-name <---> column-index
+    // table column name to file column name map. For iceberg schema evolution.
+    std::unordered_map<std::string, std::string> _table_col_to_file_col;
     std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
     std::vector<ParquetReadColumn> _read_columns;
     RowRange _whole_range = RowRange(0, 0);
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index f506597fc4..8715c938a6 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -17,6 +17,8 @@
 
 #include "iceberg_reader.h"
 
+#include <rapidjson/document.h>
+
 #include "vec/common/assert_cast.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -55,8 +57,52 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
             ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
 }
 
+Status IcebergTableReader::init_reader(
+        std::vector<std::string>& file_col_names,
+        std::unordered_map<int, std::string>& col_id_name_map,
+        std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+        VExprContext* vconjunct_ctx) {
+    ParquetReader* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
+    _col_id_name_map = col_id_name_map;
+    _file_col_names = file_col_names;
+    _colname_to_value_range = colname_to_value_range;
+    auto parquet_meta_kv = parquet_reader->get_metadata_key_values();
+    _gen_col_name_maps(parquet_meta_kv);
+    _gen_file_col_names();
+    _gen_new_colname_to_value_range();
+    parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
+    Status status = parquet_reader->init_reader(_all_required_col_names, _not_in_file_col_names,
+                                                &_new_colname_to_value_range, vconjunct_ctx);
+    return status;
+}
+
 Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
-    return _file_format_reader->get_next_block(block, read_rows, eof);
+    // To support iceberg schema evolution. We change the column name in block to
+    // make it match with the column name in parquet file before reading data. and
+    // Set the name back to table column name before return this block.
+    if (_has_schema_change) {
+        for (int i = 0; i < block->columns(); i++) {
+            ColumnWithTypeAndName& col = block->get_by_position(i);
+            auto iter = _table_col_to_file_col.find(col.name);
+            if (iter != _table_col_to_file_col.end()) {
+                col.name = iter->second;
+            }
+        }
+        block->initialize_index_by_name();
+    }
+    auto res = _file_format_reader->get_next_block(block, read_rows, eof);
+    // Set the name back to table column name before return this block.
+    if (_has_schema_change) {
+        for (int i = 0; i < block->columns(); i++) {
+            ColumnWithTypeAndName& col = block->get_by_position(i);
+            auto iter = _file_col_to_table_col.find(col.name);
+            if (iter != _file_col_to_table_col.end()) {
+                col.name = iter->second;
+            }
+        }
+        block->initialize_index_by_name();
+    }
+    return res;
 }
 
 Status IcebergTableReader::set_fill_columns(
@@ -133,8 +179,8 @@ Status IcebergTableReader::_position_delete(
                 delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
                 init_schema = true;
             }
-            create_status =
-                    delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false);
+            create_status = delete_reader.init_reader(delete_file_col_names, _not_in_file_col_names,
+                                                      nullptr, nullptr, false);
             if (!create_status.ok()) {
                 return nullptr;
             }
@@ -350,4 +396,97 @@ void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
     }
 }
 
+/*
+ * To support schema evolution, Iceberg write the column id to column name map to
+ * parquet file key_value_metadata.
+ * This function is to compare the table schema from FE (_col_id_name_map) with
+ * the schema in key_value_metadata for the current parquet file and generate two maps
+ * for future use:
+ * 1. table column name to parquet column name.
+ * 2. parquet column name to table column name.
+ * For example, parquet file has a column 'col1',
+ * after this file was written, iceberg changed the column name to 'col1_new'.
+ * The two maps would contain:
+ * 1. col1_new -> col1
+ * 2. col1 -> col1_new
+ */
+Status IcebergTableReader::_gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv) {
+    for (int i = 0; i < parquet_meta_kv.size(); ++i) {
+        tparquet::KeyValue kv = parquet_meta_kv[i];
+        if (kv.key == "iceberg.schema") {
+            std::string schema = kv.value;
+            rapidjson::Document json;
+            json.Parse(schema.c_str());
+
+            if (json.HasMember("fields")) {
+                rapidjson::Value& fields = json["fields"];
+                if (fields.IsArray()) {
+                    for (int j = 0; j < fields.Size(); j++) {
+                        rapidjson::Value& e = fields[j];
+                        rapidjson::Value& id = e["id"];
+                        rapidjson::Value& name = e["name"];
+                        std::string name_string = name.GetString();
+                        transform(name_string.begin(), name_string.end(), name_string.begin(),
+                                  ::tolower);
+                        auto iter = _col_id_name_map.find(id.GetInt());
+                        if (iter != _col_id_name_map.end()) {
+                            _table_col_to_file_col.emplace(iter->second, name_string);
+                            _file_col_to_table_col.emplace(name_string, iter->second);
+                            if (name_string != iter->second) {
+                                _has_schema_change = true;
+                            }
+                        } else {
+                            _has_schema_change = true;
+                        }
+                    }
+                }
+            }
+            break;
+        }
+    }
+    return Status::OK();
+}
+
+/*
+ * Generate _all_required_col_names and _not_in_file_col_names.
+ *
+ * _all_required_col_names is all the columns required by user sql.
+ * If the column name has been modified after the data file was written,
+ * put the old name in data file to _all_required_col_names.
+ *
+ * _not_in_file_col_names is all the columns required by user sql but not in the data file.
+ * e.g. New columns added after this data file was written.
+ * The columns added with names used by old dropped columns should consider as a missing column,
+ * which should be in _not_in_file_col_names.
+ */
+void IcebergTableReader::_gen_file_col_names() {
+    _all_required_col_names.clear();
+    _not_in_file_col_names.clear();
+    for (int i = 0; i < _file_col_names.size(); ++i) {
+        auto name = _file_col_names[i];
+        auto iter = _table_col_to_file_col.find(name);
+        if (iter == _table_col_to_file_col.end()) {
+            _all_required_col_names.emplace_back(name);
+            _not_in_file_col_names.emplace_back(name);
+        } else {
+            _all_required_col_names.emplace_back(iter->second);
+        }
+    }
+}
+
+/*
+ * Generate _new_colname_to_value_range, by replacing the column name in
+ * _colname_to_value_range with column name in data file.
+ */
+void IcebergTableReader::_gen_new_colname_to_value_range() {
+    for (auto it = _colname_to_value_range->begin(); it != _colname_to_value_range->end(); it++) {
+        auto iter = _table_col_to_file_col.find(it->first);
+        if (iter == _table_col_to_file_col.end()) {
+            _new_colname_to_value_range.emplace(it->first, it->second);
+        } else {
+            _new_colname_to_value_range.emplace(iter->second, it->second);
+        }
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index 301546b6df..bf7a361370 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -19,6 +19,7 @@
 
 #include <queue>
 
+#include "exec/olap_common.h"
 #include "table_format_reader.h"
 #include "vec/columns/column_dictionary.h"
 #include "vec/exec/format/generic_reader.h"
@@ -51,6 +52,12 @@ public:
     Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
                        std::unordered_set<std::string>* missing_cols) override;
 
+    Status init_reader(
+            std::vector<std::string>& file_col_names,
+            std::unordered_map<int, std::string>& col_id_name_map,
+            std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+            VExprContext* vconjunct_ctx);
+
     enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
 
 private:
@@ -76,6 +83,10 @@ private:
 
     PositionDeleteRange _get_range(const ColumnString& file_path_column);
 
+    Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
+    void _gen_file_col_names();
+    void _gen_new_colname_to_value_range();
+
     RuntimeProfile* _profile;
     RuntimeState* _state;
     const TFileScanRangeParams& _params;
@@ -83,6 +94,23 @@ private:
     KVCache<std::string>& _kv_cache;
     IcebergProfile _iceberg_profile;
     std::vector<int64_t> _delete_rows;
+    // col names from _file_slot_descs
+    std::vector<std::string> _file_col_names;
+    // file column name to table column name map. For iceberg schema evolution.
+    std::unordered_map<std::string, std::string> _file_col_to_table_col;
+    // table column name to file column name map. For iceberg schema evolution.
+    std::unordered_map<std::string, std::string> _table_col_to_file_col;
+    std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+    // copy from _colname_to_value_range with new column name that is in parquet file, to support schema evolution.
+    std::unordered_map<std::string, ColumnValueRangeType> _new_colname_to_value_range;
+    // column id to name map. Collect from FE slot descriptor.
+    std::unordered_map<int, std::string> _col_id_name_map;
+    // col names in the parquet file
+    std::vector<std::string> _all_required_col_names;
+    // col names in table but not in parquet file
+    std::vector<std::string> _not_in_file_col_names;
+
+    bool _has_schema_change = false;
 };
 
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index c1f85cf01b..8619719194 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -484,20 +484,24 @@ Status VFileScanner::_get_next_reader() {
             ParquetReader* parquet_reader =
                     new ParquetReader(_profile, _params, range, _state->query_options().batch_size,
                                       const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+            RETURN_IF_ERROR(parquet_reader->open());
             if (!_is_load && _push_down_expr == nullptr && _vconjunct_ctx != nullptr) {
                 RETURN_IF_ERROR(_vconjunct_ctx->clone(_state, &_push_down_expr));
                 _discard_conjuncts();
             }
-            init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range,
-                                                      _push_down_expr);
             if (range.__isset.table_format_params &&
                 range.table_format_params.table_format_type == "iceberg") {
                 IcebergTableReader* iceberg_reader =
                         new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
                                                _params, range, _kv_cache);
+                iceberg_reader->init_reader(_file_col_names, _col_id_name_map,
+                                            _colname_to_value_range, _push_down_expr);
                 RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
                 _cur_reader.reset((GenericReader*)iceberg_reader);
             } else {
+                std::vector<std::string> place_holder;
+                init_status = parquet_reader->init_reader(_file_col_names, place_holder,
+                                                          _colname_to_value_range, _push_down_expr);
                 _cur_reader.reset((GenericReader*)parquet_reader);
             }
             break;
@@ -639,10 +643,10 @@ Status VFileScanner::_init_expr_ctxes() {
         }
         if (slot_info.is_file_slot) {
             _file_slot_descs.emplace_back(it->second);
-            auto iti = full_src_index_map.find(slot_id);
-            _file_slot_index_map.emplace(slot_id, iti->second);
-            _file_slot_name_map.emplace(it->second->col_name(), iti->second);
             _file_col_names.push_back(it->second->col_name());
+            if (it->second->col_unique_id() > 0) {
+                _col_id_name_map.emplace(it->second->col_unique_id(), it->second->col_name());
+            }
         } else {
             _partition_slot_descs.emplace_back(it->second);
             if (_is_load) {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index 48bbfbf3ad..aa579232e0 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -68,12 +68,10 @@ protected:
     std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
     // File source slot descriptors
     std::vector<SlotDescriptor*> _file_slot_descs;
-    // File slot id to index in _file_slot_descs
-    std::unordered_map<SlotId, int> _file_slot_index_map;
-    // file col name to index in _file_slot_descs
-    std::map<std::string, int> _file_slot_name_map;
     // col names from _file_slot_descs
     std::vector<std::string> _file_col_names;
+    // column id to name map. Collect from FE slot descriptor.
+    std::unordered_map<int, std::string> _col_id_name_map;
 
     // Partition source slot descriptors
     std::vector<SlotDescriptor*> _partition_slot_descs;
diff --git a/be/test/vec/exec/parquet/parquet_reader_test.cpp b/be/test/vec/exec/parquet/parquet_reader_test.cpp
index 1424cf3203..078736d9c3 100644
--- a/be/test/vec/exec/parquet/parquet_reader_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_reader_test.cpp
@@ -96,6 +96,7 @@ TEST_F(ParquetReaderTest, normal) {
     TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
     auto tuple_desc = desc_tbl->get_tuple_descriptor(0);
     std::vector<std::string> column_names;
+    std::vector<std::string> missing_column_names;
     for (int i = 0; i < slot_descs.size(); i++) {
         column_names.push_back(slot_descs[i]->col_name());
     }
@@ -112,7 +113,8 @@ TEST_F(ParquetReaderTest, normal) {
     runtime_state.init_mem_trackers();
 
     std::unordered_map<std::string, ColumnValueRangeType> colname_to_value_range;
-    p_reader->init_reader(column_names, nullptr, nullptr);
+    p_reader->open();
+    p_reader->init_reader(column_names, missing_column_names, nullptr, nullptr);
     std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
             partition_columns;
     std::unordered_map<std::string, VExprContext*> missing_columns;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 0e88ec9e0d..fd7c27760b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -32,6 +32,7 @@ import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.backup.BlobStorage;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
@@ -43,8 +44,10 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Queues;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -64,6 +67,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -74,6 +78,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Deque;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -812,6 +817,28 @@ public class HiveMetaStoreClientHelper {
         }
         return output.toString();
     }
+
+    public static org.apache.iceberg.Table getIcebergTable(HMSExternalTable table) {
+        String metastoreUri = table.getMetastoreUri();
+        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
+        Configuration conf = getConfiguration(table);
+        hiveCatalog.setConf(conf);
+        // initialize hive catalog
+        Map<String, String> catalogProperties = new HashMap<>();
+        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, metastoreUri);
+        catalogProperties.put("uri", metastoreUri);
+        hiveCatalog.initialize("hive", catalogProperties);
+
+        return hiveCatalog.loadTable(TableIdentifier.of(table.getDbName(), table.getName()));
+    }
+
+    public static Configuration getConfiguration(HMSExternalTable table) {
+        Configuration conf = new HdfsConfiguration();
+        for (Map.Entry<String, String> entry : table.getHadoopProperties().entrySet()) {
+            conf.set(entry.getKey(), entry.getValue());
+        }
+        return conf;
+    }
 }
 
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
index 70ba9f9aa9..967a28c9ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/HMSExternalCatalog.java
@@ -25,6 +25,7 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.catalog.external.HMSExternalDatabase;
+import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.Config;
 import org.apache.doris.datasource.hive.PooledHiveMetaStoreClient;
 import org.apache.doris.datasource.hive.event.MetastoreNotificationFetchException;
@@ -37,12 +38,15 @@ import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEventResponse;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * External catalog for hive metastore compatible data sources.
@@ -173,6 +177,16 @@ public class HMSExternalCatalog extends ExternalCatalog {
     public List<Column> getSchema(String dbName, String tblName) {
         makeSureInitialized();
         List<FieldSchema> schema = getClient().getSchema(dbName, tblName);
+        Optional<ExternalDatabase> db = getDb(dbName);
+        if (db.isPresent()) {
+            Optional table = db.get().getTable(tblName);
+            if (table.isPresent()) {
+                HMSExternalTable hmsTable = (HMSExternalTable) table.get();
+                if (hmsTable.getDlaType().equals(HMSExternalTable.DLAType.ICEBERG)) {
+                    return getIcebergSchema(hmsTable, schema);
+                }
+            }
+        }
         List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.size());
         for (FieldSchema field : schema) {
             tmpSchema.add(new Column(field.getName(),
@@ -182,6 +196,19 @@ public class HMSExternalCatalog extends ExternalCatalog {
         return tmpSchema;
     }
 
+    private List<Column> getIcebergSchema(HMSExternalTable table, List<FieldSchema> hmsSchema) {
+        Table icebergTable = HiveMetaStoreClientHelper.getIcebergTable(table);
+        Schema schema = icebergTable.schema();
+        List<Column> tmpSchema = Lists.newArrayListWithCapacity(hmsSchema.size());
+        for (FieldSchema field : hmsSchema) {
+            tmpSchema.add(new Column(field.getName(),
+                    HiveMetaStoreClientHelper.hiveTypeToDorisType(field.getType()), true, null,
+                    true, null, field.getComment(), true, null,
+                    schema.caseInsensitiveFindField(field.getName()).fieldId()));
+        }
+        return tmpSchema;
+    }
+
     public void setLastSyncedEventId(long lastSyncedEventId) {
         this.lastSyncedEventId = lastSyncedEventId;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 929bbec57d..340597f6da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -52,8 +52,6 @@ import org.apache.doris.thrift.TFileType;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.mapred.FileSplit;
@@ -209,14 +207,6 @@ public class HiveScanProvider extends HMSTableScanProvider {
         allFiles.addAll(files);
     }
 
-    protected Configuration getConfiguration() {
-        Configuration conf = new HdfsConfiguration();
-        for (Map.Entry<String, String> entry : hmsTable.getHadoopProperties().entrySet()) {
-            conf.set(entry.getKey(), entry.getValue());
-        }
-        return conf;
-    }
-
     public int getTotalPartitionNum() {
         return totalPartitionNum;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index 46adddfde6..ca68f755d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -20,7 +20,7 @@ package org.apache.doris.planner.external;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.HMSResource;
+import org.apache.doris.catalog.HiveMetaStoreClientHelper;
 import org.apache.doris.catalog.external.HMSExternalTable;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
@@ -33,7 +33,6 @@ import org.apache.doris.thrift.TIcebergDeleteFileDesc;
 import org.apache.doris.thrift.TIcebergFileDesc;
 import org.apache.doris.thrift.TTableFormatFileDesc;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.iceberg.BaseTable;
@@ -43,14 +42,12 @@ import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
-import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.types.Conversions;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -134,7 +131,7 @@ public class IcebergScanProvider extends HiveScanProvider {
             }
         }
 
-        org.apache.iceberg.Table table = getIcebergTable();
+        org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
         TableScan scan = table.newScan();
         for (Expression predicate : expressions) {
             scan = scan.filter(predicate);
@@ -181,19 +178,6 @@ public class IcebergScanProvider extends HiveScanProvider {
         return filters;
     }
 
-    private org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException {
-        org.apache.iceberg.hive.HiveCatalog hiveCatalog = new org.apache.iceberg.hive.HiveCatalog();
-        Configuration conf = getConfiguration();
-        hiveCatalog.setConf(conf);
-        // initialize hive catalog
-        Map<String, String> catalogProperties = new HashMap<>();
-        catalogProperties.put(HMSResource.HIVE_METASTORE_URIS, getMetaStoreUrl());
-        catalogProperties.put("uri", getMetaStoreUrl());
-        hiveCatalog.initialize("hive", catalogProperties);
-
-        return hiveCatalog.loadTable(TableIdentifier.of(hmsTable.getDbName(), hmsTable.getName()));
-    }
-
     @Override
     public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
         return Collections.emptyList();
diff --git a/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out b/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out
new file mode 100644
index 0000000000..dba805ca6d
--- /dev/null
+++ b/regression-test/data/external_catalog_p0/iceberg/iceberg_schema_evolution.out
@@ -0,0 +1,79 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !rename1 --
+1	orig2_1	orig3_1
+2	orig2_2	orig3_2
+3	orig2_3	orig3_3
+4	orig2_4	rename3_1
+5	orig2_5	rename3_2
+6	orig2_6	rename3_3
+
+-- !rename2 --
+3	orig2_3	orig3_3
+4	orig2_4	rename3_1
+
+-- !drop1 --
+1	orig3_1
+2	orig3_2
+3	orig3_3
+4	orig3_4
+5	orig3_5
+6	orig3_6
+
+-- !drop2 --
+1	orig3_1
+2	orig3_2
+3	orig3_3
+
+-- !drop3 --
+4	orig3_4
+5	orig3_5
+6	orig3_6
+
+-- !add1 --
+1	orig2_1	orig3_1	\N
+2	orig2_2	orig3_2	\N
+3	orig2_3	orig3_3	\N
+4	orig2_4	orig3_4	add1_1
+5	orig2_5	orig3_5	add1_2
+6	orig2_6	orig3_6	add1_3
+
+-- !add2 --
+2	orig2_2	orig3_2	\N
+
+-- !add3 --
+5	orig2_5	orig3_5	add1_2
+
+-- !reorder1 --
+1	orig3_1	orig2_1
+2	orig3_2	orig2_2
+3	orig3_3	orig2_3
+4	orig3_4	orig2_4
+5	orig3_5	orig2_5
+6	orig3_6	orig2_6
+
+-- !reorder2 --
+2	orig3_2	orig2_2
+
+-- !reorder3 --
+5	orig3_5	orig2_5
+
+-- !readd1 --
+1	orig2_1	\N
+2	orig2_2	\N
+3	orig2_3	\N
+4	orig2_4	orig3_4
+5	orig2_5	orig3_5
+6	orig2_6	orig3_6
+
+-- !readd2 --
+1	orig2_1	\N
+2	orig2_2	\N
+3	orig2_3	\N
+4	orig2_4	orig3_4
+
+-- !readd3 --
+3	orig2_3	\N
+4	orig2_4	orig3_4
+5	orig2_5	orig3_5
+6	orig2_6	orig3_6
+
diff --git a/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy b/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy
new file mode 100644
index 0000000000..d014cafc03
--- /dev/null
+++ b/regression-test/suites/external_catalog_p0/iceberg/iceberg_schema_evolution.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_schema_evolution", "p0") {
+    def rename1 = """select * from rename_test order by rename_1;"""
+    def rename2 = """select * from rename_test where rename_1 in (3, 4) order by rename_1;"""
+    def drop1 = """select * from drop_test order by orig1;"""
+    def drop2 = """select * from drop_test where orig1<=3 order by orig1;"""
+    def drop3 = """select * from drop_test where orig1>3 order by orig1;"""
+    def add1 = """select * from add_test order by orig1;"""
+    def add2 = """select * from add_test where orig1 = 2;"""
+    def add3 = """select * from add_test where orig1 = 5;"""
+    def reorder1 = """select * from reorder_test order by orig1;"""
+    def reorder2 = """select * from reorder_test where orig1 = 2;"""
+    def reorder3 = """select * from reorder_test where orig1 = 5;"""
+    def readd1 = """select * from readd_test order by orig1;"""
+    def readd2 = """select * from readd_test where orig1<5 order by orig1;"""
+    def readd3 = """select * from readd_test where orig1>2 order by orig1;"""
+
+
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
+        String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
+        String catalog_name = "iceberg_schema_evolution"
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+            );
+        """
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch ${catalog_name};"""
+        logger.info("switched to catalog " + catalog_name)
+        sql """use iceberg_schema_evolution;"""
+        qt_rename1 rename1
+        qt_rename2 rename2
+        qt_drop1 drop1
+        qt_drop2 drop2
+        qt_drop3 drop3
+        qt_add1 add1
+        qt_add2 add2
+        qt_add3 add3
+        qt_reorder1 reorder1
+        qt_reorder2 reorder2
+        qt_reorder3 reorder3
+        qt_readd1 readd1
+        qt_readd2 readd2
+        qt_readd3 readd3
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org