You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/27 16:29:18 UTC

[doris] branch branch-1.2-lts updated (5c5030b6dd -> 893ca4be02)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a change to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


    from 5c5030b6dd [fix](icebergv2) fix icebergv2 delete file open bug and column case insensitive (#16135)
     new dc5588641e [fix](multi catalog)Support parquet and orc upper case column name (#16111)
     new ac729840b0 [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
     new 893ca4be02 [fix](metric) fix be down when enable_system_metrics is false (#16140)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/common/daemon.cpp                           |  52 +++---
 be/src/util/doris_metrics.cpp                      |   7 -
 be/src/util/doris_metrics.h                        |   6 -
 be/src/util/system_metrics.cpp                     |  21 +++
 be/src/util/system_metrics.h                       |   9 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  34 ++--
 be/src/vec/exec/format/orc/vorc_reader.h           |   3 +
 be/src/vec/exec/format/parquet/schema_desc.cpp     |   8 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  | 197 +++++++++++++++++----
 .../exec/format/parquet/vparquet_group_reader.h    |   8 +
 be/src/vec/exec/format/table/iceberg_reader.cpp    |   4 +-
 be/src/vec/exec/format/table/iceberg_reader.h      |   4 +-
 .../org/apache/doris/catalog/IcebergTable.java     |   5 +
 .../doris/external/iceberg/util/IcebergUtils.java  |  14 +-
 .../org/apache/doris/planner/IcebergScanNode.java  |  27 +--
 .../planner/external/IcebergScanProvider.java      |   4 +-
 .../hive/test_upper_case_column_name.out           |  89 ++++++++++
 .../hive/test_upper_case_column_name.groovy        | 103 +++++++++++
 18 files changed, 469 insertions(+), 126 deletions(-)
 create mode 100644 regression-test/data/external_table_emr_p2/hive/test_upper_case_column_name.out
 create mode 100644 regression-test/suites/external_table_emr_p2/hive/test_upper_case_column_name.groovy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 02/03: [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ac729840b0812e8759ab91b08598a7fa53f89a7b
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Sat Jan 28 00:04:27 2023 +0800

    [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
    
    close #16023
---
 .../exec/format/parquet/vparquet_group_reader.cpp  | 197 +++++++++++++++++----
 .../exec/format/parquet/vparquet_group_reader.h    |   8 +
 2 files changed, 168 insertions(+), 37 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0d7df17e4b..34b478114e 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -98,42 +98,35 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
                 _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
 
-        Status st =
-                VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns());
+        if (block->rows() == 0) {
+            *read_rows = block->rows();
+            return Status::OK();
+        }
+
+        RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
+
+        std::vector<uint32_t> columns_to_filter;
+        int column_to_keep = block->columns();
+        columns_to_filter.resize(column_to_keep);
+        for (uint32_t i = 0; i < column_to_keep; ++i) {
+            columns_to_filter[i] = i;
+        }
+        if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &result_column_id));
+            ColumnPtr filter_column = block->get_by_position(result_column_id).column;
+            RETURN_IF_ERROR(_filter_block(block, filter_column, column_to_keep, columns_to_filter));
+        } else {
+            RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter));
+        }
+
         *read_rows = block->rows();
-        return st;
+        return Status::OK();
     }
 }
 
 void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
-    // row_ranges is generated from page index, and the row index begins with 0 in each row group.
-    // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file
-    for (auto& range : row_ranges) {
-        int64_t start_row_id = range.first_row;
-        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
-            const int64_t& delete_row_id =
-                    _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
-                    _position_delete_ctx.first_row_id;
-            if (delete_row_id < range.first_row) {
-                _position_delete_ctx.index++;
-            } else if (delete_row_id < range.last_row) {
-                if (start_row_id < delete_row_id) {
-                    _read_ranges.emplace_back(start_row_id, delete_row_id);
-                }
-                start_row_id = delete_row_id + 1;
-                _position_delete_ctx.index++;
-            } else { // delete_row_id >= range.last_row
-                if (start_row_id < range.last_row) {
-                    _read_ranges.emplace_back(start_row_id, range.last_row);
-                    start_row_id = range.last_row + 1;
-                }
-                break;
-            }
-        }
-        if (start_row_id < range.last_row) {
-            _read_ranges.emplace_back(start_row_id, range.last_row);
-        }
-    }
+    _read_ranges = row_ranges;
 }
 
 Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
@@ -194,6 +187,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
                                                 _lazy_read_ctx.predicate_partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
                                               _lazy_read_ctx.predicate_missing_columns));
+
+        RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
+
         // generate filter vector
         if (_lazy_read_ctx.resize_first_column) {
             // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
@@ -262,8 +258,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
             // generated from next batch, so the filter column is removed ahead.
             DCHECK_EQ(block->rows(), 0);
         } else {
-            Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
-                                origin_column_num);
+            ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
+            RETURN_IF_ERROR(_filter_block(block, filter_column, origin_column_num,
+                                          _lazy_read_ctx.all_predicate_col_ids));
         }
     } else {
         Block::erase_useless_column(block, origin_column_num);
@@ -302,16 +299,23 @@ const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows,
                     nullable_column->get_nested_column_ptr()->assume_mutable().get());
             uint8_t* filter_data = concrete_column->get_data().data();
             for (int i = 0; i < num_rows; ++i) {
-                // filter null if filter_column if nullable
-                filter_data[i] &= !null_map_column[i];
+                (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i];
             }
-            filter_map = filter_data;
+            filter_map = _filter_ptr->data();
         }
     } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
         // filter all
         *can_filter_all = !const_column->get_bool(0);
     } else {
-        filter_map = assert_cast<const ColumnVector<UInt8>&>(*sv).get_data().data();
+        const IColumn::Filter& filter =
+                assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data();
+
+        auto* __restrict filter_data = filter.data();
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= filter_data[i];
+        }
+        filter_map = filter.data();
     }
     return filter_map;
 }
@@ -434,6 +438,125 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b
     return Status::OK();
 }
 
+Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
+    _filter_ptr.reset(new IColumn::Filter(read_rows, 1));
+    if (!_position_delete_ctx.has_filter) {
+        _total_read_rows += read_rows;
+        return Status::OK();
+    }
+    while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+        const int64_t delete_row_index_in_row_group =
+                _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
+                _position_delete_ctx.first_row_id;
+        int64_t read_range_rows = 0;
+        size_t remaining_read_rows = _total_read_rows + read_rows;
+        for (auto& range : _read_ranges) {
+            if (delete_row_index_in_row_group < range.first_row) {
+                ++_position_delete_ctx.index;
+                break;
+            } else if (delete_row_index_in_row_group < range.last_row) {
+                int64_t index = (delete_row_index_in_row_group - range.first_row) +
+                                read_range_rows - _total_read_rows;
+                if (index > read_rows - 1) {
+                    _total_read_rows += read_rows;
+                    return Status::OK();
+                }
+                (*_filter_ptr)[index] = 0;
+                ++_position_delete_ctx.index;
+                break;
+            } else { // delete_row >= range.last_row
+            }
+
+            int64_t range_size = range.last_row - range.first_row;
+            // Don't search next range when there is no remaining_read_rows.
+            if (remaining_read_rows <= range_size) {
+                _total_read_rows += read_rows;
+                return Status::OK();
+            } else {
+                remaining_read_rows -= range_size;
+                read_range_rows += range_size;
+            }
+        }
+    }
+    _total_read_rows += read_rows;
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, const ColumnPtr filter_column,
+                                     int column_to_keep, std::vector<uint32_t> columns_to_filter) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
+        ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+
+        MutableColumnPtr mutable_holder =
+                nested_column->use_count() == 1
+                        ? nested_column->assume_mutable()
+                        : nested_column->clone_resized(nested_column->size());
+
+        ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(mutable_holder.get());
+        if (!concrete_column) {
+            return Status::InvalidArgument(
+                    "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).",
+                    filter_column->get_name());
+        }
+        auto* __restrict null_map = nullable_column->get_null_map_data().data();
+        IColumn::Filter& filter = concrete_column->get_data();
+        auto* __restrict filter_data = filter.data();
+
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i];
+        }
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
+        bool ret = const_column->get_bool(0);
+        if (!ret) {
+            for (auto& col : columns_to_filter) {
+                std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+            }
+        }
+    } else {
+        const IColumn::Filter& filter =
+                assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
+                        .get_data();
+
+        auto* __restrict filter_data = filter.data();
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= filter_data[i];
+        }
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    }
+    Block::erase_useless_column(block, column_to_keep);
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
+                                     const std::vector<uint32_t>& columns_to_filter) {
+    RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    Block::erase_useless_column(block, column_to_keep);
+
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block_internal(Block* block,
+                                              const std::vector<uint32_t>& columns_to_filter) {
+    size_t count = _filter_ptr->size() -
+                   simd::count_zero_num((int8_t*)_filter_ptr->data(), _filter_ptr->size());
+    if (count == 0) {
+        for (auto& col : columns_to_filter) {
+            std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+        }
+    } else {
+        for (auto& col : columns_to_filter) {
+            if (block->get_by_position(col).column->size() != count) {
+                block->get_by_position(col).column =
+                        block->get_by_position(col).column->filter(*_filter_ptr, count);
+            }
+        }
+    }
+    return Status::OK();
+}
+
 ParquetColumnReader::Statistics RowGroupReader::statistics() {
     ParquetColumnReader::Statistics st;
     for (auto& reader : _column_readers) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 53e8a052c0..fcc1e387a8 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -130,6 +130,12 @@ private:
     Status _fill_missing_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContext*>& missing_columns);
+    Status _build_pos_delete_filter(size_t read_rows);
+    Status _filter_block(Block* block, const ColumnPtr filter_column, int column_to_keep,
+                         std::vector<uint32_t> columns_to_filter);
+    Status _filter_block(Block* block, int column_to_keep,
+                         const vector<uint32_t>& columns_to_filter);
+    Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter);
 
     doris::FileReader* _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
@@ -147,5 +153,7 @@ private:
     // If continuous batches are skipped, we can cache them to skip a whole page
     size_t _cached_filtered_rows = 0;
     std::unique_ptr<TextConverter> _text_converter = nullptr;
+    std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr;
+    int64_t _total_read_rows = 0;
 };
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 03/03: [fix](metric) fix be down when enable_system_metrics is false (#16140)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 893ca4be02fef717241be68382a0b0348d19839c
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Sat Jan 28 00:10:39 2023 +0800

    [fix](metric) fix be down when enable_system_metrics is false (#16140)
    
    if we set enable_system_metrics to false, we will see be down with following message "enable metric calculator failed,
    maybe you set enable_system_metrics to false ", so fix it
    Co-authored-by: caiconghui1 <ca...@jd.com>
---
 be/src/common/daemon.cpp                           | 52 +++++++++++-----------
 be/src/util/doris_metrics.cpp                      |  7 ---
 be/src/util/doris_metrics.h                        |  6 ---
 be/src/util/system_metrics.cpp                     | 21 +++++++++
 be/src/util/system_metrics.h                       |  9 ++++
 .../exec/format/parquet/vparquet_group_reader.h    |  4 +-
 6 files changed, 58 insertions(+), 41 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index aabb3eb9b9..992b6528b3 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -277,9 +277,11 @@ void Daemon::calculate_metrics_thread() {
         if (last_ts == -1L) {
             last_ts = GetMonoTimeMicros() / 1000;
             lst_query_bytes = DorisMetrics::instance()->query_scan_bytes->value();
-            DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
-            DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes,
-                                                                            &lst_net_receive_bytes);
+            if (config::enable_system_metrics) {
+                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
+                DorisMetrics::instance()->system_metrics()->get_network_traffic(
+                        &lst_net_send_bytes, &lst_net_receive_bytes);
+            }
         } else {
             int64_t current_ts = GetMonoTimeMicros() / 1000;
             long interval = (current_ts - last_ts) / 1000;
@@ -291,23 +293,27 @@ void Daemon::calculate_metrics_thread() {
             DorisMetrics::instance()->query_scan_bytes_per_second->set_value(qps < 0 ? 0 : qps);
             lst_query_bytes = current_query_bytes;
 
-            // 2. max disk io util
-            DorisMetrics::instance()->max_disk_io_util_percent->set_value(
-                    DorisMetrics::instance()->system_metrics()->get_max_io_util(lst_disks_io_time,
-                                                                                15));
-            // update lst map
-            DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
-
-            // 3. max network traffic
-            int64_t max_send = 0;
-            int64_t max_receive = 0;
-            DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
-                    lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
-            DorisMetrics::instance()->max_network_send_bytes_rate->set_value(max_send);
-            DorisMetrics::instance()->max_network_receive_bytes_rate->set_value(max_receive);
-            // update lst map
-            DorisMetrics::instance()->system_metrics()->get_network_traffic(&lst_net_send_bytes,
-                                                                            &lst_net_receive_bytes);
+            if (config::enable_system_metrics) {
+                // 2. max disk io util
+                DorisMetrics::instance()->system_metrics()->update_max_disk_io_util_percent(
+                        lst_disks_io_time, 15);
+
+                // update lst map
+                DorisMetrics::instance()->system_metrics()->get_disks_io_time(&lst_disks_io_time);
+
+                // 3. max network traffic
+                int64_t max_send = 0;
+                int64_t max_receive = 0;
+                DorisMetrics::instance()->system_metrics()->get_max_net_traffic(
+                        lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
+                DorisMetrics::instance()->system_metrics()->update_max_network_send_bytes_rate(
+                        max_send);
+                DorisMetrics::instance()->system_metrics()->update_max_network_receive_bytes_rate(
+                        max_receive);
+                // update lst map
+                DorisMetrics::instance()->system_metrics()->get_network_traffic(
+                        &lst_net_send_bytes, &lst_net_receive_bytes);
+            }
         }
     } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
 }
@@ -434,12 +440,6 @@ void Daemon::start() {
     CHECK(st.ok()) << st.to_string();
 
     if (config::enable_metric_calculator) {
-        CHECK(DorisMetrics::instance()->is_inited())
-                << "enable metric calculator failed, maybe you set enable_system_metrics to false "
-                << " or there may be some hardware error which causes metric init failed, please "
-                   "check log first;"
-                << " you can set enable_metric_calculator = false to quickly recover ";
-
         st = Thread::create(
                 "Daemon", "calculate_metrics_thread",
                 [this]() { this->calculate_metrics_thread(); }, &_calculate_metrics_thread);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 15be14b06e..6f61ef7c36 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -142,9 +142,6 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(compaction_waitting_permits, MetricUnit::NOUN
 DEFINE_HISTOGRAM_METRIC_PROTOTYPE_2ARG(tablet_version_num_distribution, MetricUnit::NOUNIT);
 
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(query_scan_bytes_per_second, MetricUnit::BYTES);
-DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(max_disk_io_util_percent, MetricUnit::PERCENT);
-DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(max_network_send_bytes_rate, MetricUnit::BYTES);
-DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(max_network_receive_bytes_rate, MetricUnit::BYTES);
 
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(readable_blocks_total, MetricUnit::BLOCKS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(writable_blocks_total, MetricUnit::BLOCKS);
@@ -268,9 +265,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
     HISTOGRAM_METRIC_REGISTER(_server_metric_entity, tablet_version_num_distribution);
 
     INT_GAUGE_METRIC_REGISTER(_server_metric_entity, query_scan_bytes_per_second);
-    INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_disk_io_util_percent);
-    INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_send_bytes_rate);
-    INT_GAUGE_METRIC_REGISTER(_server_metric_entity, max_network_receive_bytes_rate);
 
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
@@ -311,7 +305,6 @@ void DorisMetrics::initialize(bool init_system_metrics, const std::set<std::stri
     if (init_system_metrics) {
         _system_metrics.reset(
                 new SystemMetrics(&_metric_registry, disk_devices, network_interfaces));
-        _is_inited = true;
     }
 }
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 7536be7372..d117456d01 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -147,9 +147,6 @@ public:
     // The following metrics will be calculated
     // by metric calculator
     IntGauge* query_scan_bytes_per_second;
-    IntGauge* max_disk_io_util_percent;
-    IntGauge* max_network_send_bytes_rate;
-    IntGauge* max_network_receive_bytes_rate;
 
     // Metrics related with file reader/writer
     IntCounter* local_file_reader_total;
@@ -226,7 +223,6 @@ public:
     MetricRegistry* metric_registry() { return &_metric_registry; }
     SystemMetrics* system_metrics() { return _system_metrics.get(); }
     MetricEntity* server_entity() { return _server_metric_entity.get(); }
-    bool is_inited() { return _is_inited; }
 
 private:
     // Don't allow constructor
@@ -245,8 +241,6 @@ private:
     std::unique_ptr<SystemMetrics> _system_metrics;
 
     std::shared_ptr<MetricEntity> _server_metric_entity;
-
-    bool _is_inited = false;
 };
 
 }; // namespace doris
diff --git a/be/src/util/system_metrics.cpp b/be/src/util/system_metrics.cpp
index d1926aedd6..11703f2187 100644
--- a/be/src/util/system_metrics.cpp
+++ b/be/src/util/system_metrics.cpp
@@ -301,6 +301,10 @@ struct ProcMetrics {
     IntAtomicCounter* proc_procs_blocked;
 };
 
+DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(_max_disk_io_util_percent, MetricUnit::PERCENT);
+DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(_max_network_send_bytes_rate, MetricUnit::BYTES);
+DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(_max_network_receive_bytes_rate, MetricUnit::BYTES);
+
 const char* SystemMetrics::_s_hook_name = "system_metrics";
 
 SystemMetrics::SystemMetrics(MetricRegistry* registry, const std::set<std::string>& disk_devices,
@@ -318,6 +322,10 @@ SystemMetrics::SystemMetrics(MetricRegistry* registry, const std::set<std::strin
     _install_snmp_metrics(_server_entity.get());
     _install_load_avg_metrics(_server_entity.get());
     _install_proc_metrics(_server_entity.get());
+
+    INT_GAUGE_METRIC_REGISTER(_server_entity.get(), _max_disk_io_util_percent);
+    INT_GAUGE_METRIC_REGISTER(_server_entity.get(), _max_network_send_bytes_rate);
+    INT_GAUGE_METRIC_REGISTER(_server_entity.get(), _max_network_receive_bytes_rate);
 }
 
 SystemMetrics::~SystemMetrics() {
@@ -858,6 +866,19 @@ void SystemMetrics::get_max_net_traffic(const std::map<std::string, int64_t>& ls
     *rcv_rate = max_rcv / interval_sec;
 }
 
+void SystemMetrics::update_max_disk_io_util_percent(const std::map<std::string, int64_t>& lst_value,
+                                                    int64_t interval_sec) {
+    _max_disk_io_util_percent->set_value(get_max_io_util(lst_value, interval_sec));
+}
+
+void SystemMetrics::update_max_network_send_bytes_rate(int64_t max_send_bytes_rate) {
+    _max_network_send_bytes_rate->set_value(max_send_bytes_rate);
+}
+
+void SystemMetrics::update_max_network_receive_bytes_rate(int64_t max_receive_bytes_rate) {
+    _max_network_receive_bytes_rate->set_value(max_receive_bytes_rate);
+}
+
 void SystemMetrics::_install_proc_metrics(MetricEntity* entity) {
     _proc_metrics.reset(new ProcMetrics(entity));
 }
diff --git a/be/src/util/system_metrics.h b/be/src/util/system_metrics.h
index 903588602d..5354f494de 100644
--- a/be/src/util/system_metrics.h
+++ b/be/src/util/system_metrics.h
@@ -51,6 +51,11 @@ public:
                              const std::map<std::string, int64_t>& lst_rcv_map,
                              int64_t interval_sec, int64_t* send_rate, int64_t* rcv_rate);
 
+    void update_max_disk_io_util_percent(const std::map<std::string, int64_t>& lst_value,
+                                         int64_t interval_sec);
+    void update_max_network_send_bytes_rate(int64_t max_send_bytes_rate);
+    void update_max_network_receive_bytes_rate(int64_t max_receive_bytes_rate);
+
 private:
     void _install_cpu_metrics();
     // On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz;
@@ -99,6 +104,10 @@ private:
     size_t _line_buf_size = 0;
     MetricRegistry* _registry = nullptr;
     std::shared_ptr<MetricEntity> _server_entity = nullptr;
+
+    IntGauge* _max_disk_io_util_percent;
+    IntGauge* _max_network_send_bytes_rate;
+    IntGauge* _max_network_receive_bytes_rate;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index fcc1e387a8..b0a40029ec 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -134,8 +134,8 @@ private:
     Status _filter_block(Block* block, const ColumnPtr filter_column, int column_to_keep,
                          std::vector<uint32_t> columns_to_filter);
     Status _filter_block(Block* block, int column_to_keep,
-                         const vector<uint32_t>& columns_to_filter);
-    Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter);
+                         const std::vector<uint32_t>& columns_to_filter);
+    Status _filter_block_internal(Block* block, const std::vector<uint32_t>& columns_to_filter);
 
     doris::FileReader* _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[doris] 01/03: [fix](multi catalog)Support parquet and orc upper case column name (#16111)

Posted by mo...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit dc5588641ec2318c0969f03e1cfad91e77679eb8
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Fri Jan 27 23:52:11 2023 +0800

    [fix](multi catalog)Support parquet and orc upper case column name (#16111)
    
    External hms catalog table column names in doris are all in lower case,
    while iceberg table or spark-sql created hive table may contain upper case column name,
    which will cause empty query result. This pr is to fix this bug.
    1. For parquet file, transfer all column names to lower case while parse parquet metadata.
    2. For orc file, store the origin column names and lower case column names in two vectors, use the suitable names in different cases.
    3. FE side, change the column name back to the origin column name in iceberg while doing convertToIcebergExpr.
---
 be/src/vec/exec/format/orc/vorc_reader.cpp         |  34 ++++---
 be/src/vec/exec/format/orc/vorc_reader.h           |   3 +
 be/src/vec/exec/format/parquet/schema_desc.cpp     |   8 +-
 be/src/vec/exec/format/table/iceberg_reader.cpp    |   4 +-
 be/src/vec/exec/format/table/iceberg_reader.h      |   4 +-
 .../org/apache/doris/catalog/IcebergTable.java     |   5 +
 .../doris/external/iceberg/util/IcebergUtils.java  |  14 +--
 .../org/apache/doris/planner/IcebergScanNode.java  |  27 +-----
 .../planner/external/IcebergScanProvider.java      |   4 +-
 .../hive/test_upper_case_column_name.out           |  89 ++++++++++++++++++
 .../hive/test_upper_case_column_name.groovy        | 103 +++++++++++++++++++++
 11 files changed, 245 insertions(+), 50 deletions(-)

diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 2748160e97..c295712491 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -170,7 +170,7 @@ Status OrcReader::init_reader(
     auto& selected_type = _row_reader->getSelectedType();
     _col_orc_type.resize(selected_type.getSubtypeCount());
     for (int i = 0; i < selected_type.getSubtypeCount(); ++i) {
-        _colname_to_idx[selected_type.getFieldName(i)] = i;
+        _colname_to_idx[_get_field_name_lower_case(&selected_type, i)] = i;
         _col_orc_type[i] = selected_type.getSubtype(i);
     }
     return Status::OK();
@@ -204,7 +204,7 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
 
     auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        col_names->emplace_back(root_type.getFieldName(i));
+        col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
         col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
     }
     return Status::OK();
@@ -212,15 +212,20 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
 
 Status OrcReader::_init_read_columns() {
     auto& root_type = _reader->getType();
-    std::unordered_set<std::string> orc_cols;
+    std::vector<std::string> orc_cols;
+    std::vector<std::string> orc_cols_lower_case;
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        orc_cols.emplace(root_type.getFieldName(i));
+        orc_cols.emplace_back(root_type.getFieldName(i));
+        orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&root_type, i));
     }
     for (auto& col_name : _column_names) {
-        if (orc_cols.find(col_name) == orc_cols.end()) {
+        auto iter = std::find(orc_cols_lower_case.begin(), orc_cols_lower_case.end(), col_name);
+        if (iter == orc_cols_lower_case.end()) {
             _missing_cols.emplace_back(col_name);
         } else {
-            _read_cols.emplace_back(col_name);
+            int pos = std::distance(orc_cols_lower_case.begin(), iter);
+            _read_cols.emplace_back(orc_cols[pos]);
+            _read_cols_lower_case.emplace_back(col_name);
         }
     }
     return Status::OK();
@@ -478,7 +483,7 @@ void OrcReader::_init_search_argument(
     auto& root_type = _reader->getType();
     std::unordered_map<std::string, const orc::Type*> type_map;
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        type_map.emplace(root_type.getFieldName(i), root_type.getSubtype(i));
+        type_map.emplace(_get_field_name_lower_case(&root_type, i), root_type.getSubtype(i));
     }
     for (auto it = colname_to_value_range->begin(); it != colname_to_value_range->end(); ++it) {
         auto type_it = type_map.find(it->first);
@@ -558,7 +563,7 @@ TypeDescriptor OrcReader::_convert_to_doris_type(const orc::Type* orc_type) {
         TypeDescriptor struct_type(PrimitiveType::TYPE_STRUCT);
         for (int i = 0; i < orc_type->getSubtypeCount(); ++i) {
             struct_type.children.emplace_back(_convert_to_doris_type(orc_type->getSubtype(i)));
-            struct_type.field_names.emplace_back(orc_type->getFieldName(i));
+            struct_type.field_names.emplace_back(_get_field_name_lower_case(orc_type, i));
         }
         return struct_type;
     }
@@ -571,7 +576,8 @@ std::unordered_map<std::string, TypeDescriptor> OrcReader::get_name_to_type() {
     std::unordered_map<std::string, TypeDescriptor> map;
     auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        map.emplace(root_type.getFieldName(i), _convert_to_doris_type(root_type.getSubtype(i)));
+        map.emplace(_get_field_name_lower_case(&root_type, i),
+                    _convert_to_doris_type(root_type.getSubtype(i)));
     }
     return map;
 }
@@ -580,7 +586,7 @@ Status OrcReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
                               std::unordered_set<std::string>* missing_cols) {
     auto& root_type = _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
-        name_to_type->emplace(root_type.getFieldName(i),
+        name_to_type->emplace(_get_field_name_lower_case(&root_type, i),
                               _convert_to_doris_type(root_type.getSubtype(i)));
     }
     for (auto& col : _missing_cols) {
@@ -735,6 +741,12 @@ Status OrcReader::_orc_column_to_doris_column(const std::string& col_name,
     return Status::InternalError("Unsupported type for column '{}'", col_name);
 }
 
+std::string OrcReader::_get_field_name_lower_case(const orc::Type* orc_type, int pos) {
+    std::string name = orc_type->getFieldName(pos);
+    transform(name.begin(), name.end(), name.begin(), ::tolower);
+    return name;
+}
+
 Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
     SCOPED_RAW_TIMER(&_statistics.column_read_time);
     {
@@ -746,7 +758,7 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
         }
     }
     const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
-    for (auto& col : _read_cols) {
+    for (auto& col : _read_cols_lower_case) {
         auto& column_with_type_and_name = block->get_by_name(col);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 98c2fc7c02..aa3efa8d2c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -258,6 +258,8 @@ private:
                                      const MutableColumnPtr& data_column, orc::ListVectorBatch* lvb,
                                      size_t num_values, size_t* element_size);
 
+    std::string _get_field_name_lower_case(const orc::Type* orc_type, int pos);
+
     RuntimeProfile* _profile;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
@@ -269,6 +271,7 @@ private:
     cctz::time_zone _time_zone;
 
     std::list<std::string> _read_cols;
+    std::list<std::string> _read_cols_lower_case;
     std::list<std::string> _missing_cols;
     std::unordered_map<std::string, int> _colname_to_idx;
     std::vector<const orc::Type*> _col_orc_type;
diff --git a/be/src/vec/exec/format/parquet/schema_desc.cpp b/be/src/vec/exec/format/parquet/schema_desc.cpp
index b8b9b07184..e1a5225ff8 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.cpp
+++ b/be/src/vec/exec/format/parquet/schema_desc.cpp
@@ -129,7 +129,9 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
         auto child = &node_field->children[0];
         parse_physical_field(t_schema, false, child);
 
-        node_field->name = t_schema.name;
+        std::string lower_case_name;
+        transform(t_schema.name.begin(), t_schema.name.end(), lower_case_name.begin(), ::tolower);
+        node_field->name = lower_case_name;
         node_field->type.type = TYPE_ARRAY;
         node_field->is_nullable = false;
         _next_schema_pos = curr_pos + 1;
@@ -146,7 +148,9 @@ Status FieldDescriptor::parse_node_field(const std::vector<tparquet::SchemaEleme
 
 void FieldDescriptor::parse_physical_field(const tparquet::SchemaElement& physical_schema,
                                            bool is_nullable, FieldSchema* physical_field) {
-    physical_field->name = physical_schema.name;
+    std::string lower_case_name = physical_schema.name;
+    transform(lower_case_name.begin(), lower_case_name.end(), lower_case_name.begin(), ::tolower);
+    physical_field->name = lower_case_name;
     physical_field->parquet_schema = physical_schema;
     physical_field->is_nullable = is_nullable;
     physical_field->physical_type = physical_schema.type;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index f636453a00..a3302aeb09 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -58,8 +58,8 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
 }
 
 Status IcebergTableReader::init_reader(
-        std::vector<std::string>& file_col_names,
-        std::unordered_map<int, std::string>& col_id_name_map,
+        const std::vector<std::string>& file_col_names,
+        const std::unordered_map<int, std::string>& col_id_name_map,
         std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
         VExprContext* vconjunct_ctx) {
     ParquetReader* parquet_reader = static_cast<ParquetReader*>(_file_format_reader.get());
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index bf7a361370..c97577ad05 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -53,8 +53,8 @@ public:
                        std::unordered_set<std::string>* missing_cols) override;
 
     Status init_reader(
-            std::vector<std::string>& file_col_names,
-            std::unordered_map<int, std::string>& col_id_name_map,
+            const std::vector<std::string>& file_col_names,
+            const std::unordered_map<int, std::string>& col_id_name_map,
             std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
             VExprContext* vconjunct_ctx);
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
index 834f30fab0..8d6907ff6a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/IcebergTable.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
 import org.apache.iceberg.catalog.TableIdentifier;
@@ -171,6 +172,10 @@ public class IcebergTable extends Table {
         return fileFormat;
     }
 
+    public Schema getIcebergSchema() {
+        return icebergTable.schema();
+    }
+
     private org.apache.iceberg.Table getTable() throws Exception {
         if (isLoaded.get()) {
             Preconditions.checkNotNull(icebergTable);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
index 325541e79d..876842ea06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/external/iceberg/util/IcebergUtils.java
@@ -220,7 +220,7 @@ public class IcebergUtils {
         return DorisTypeVisitor.visit(type, new DorisTypeToType());
     }
 
-    public static Expression convertToIcebergExpr(Expr expr) {
+    public static Expression convertToIcebergExpr(Expr expr, Schema schema) {
         if (expr == null) {
             return null;
         }
@@ -241,23 +241,23 @@ public class IcebergUtils {
             CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
             switch (compoundPredicate.getOp()) {
                 case AND: {
-                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0));
-                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1));
+                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
+                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
                     if (left != null && right != null) {
                         return Expressions.and(left, right);
                     }
                     return null;
                 }
                 case OR: {
-                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0));
-                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1));
+                    Expression left = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
+                    Expression right = convertToIcebergExpr(compoundPredicate.getChild(1), schema);
                     if (left != null && right != null) {
                         return Expressions.or(left, right);
                     }
                     return null;
                 }
                 case NOT: {
-                    Expression child = convertToIcebergExpr(compoundPredicate.getChild(0));
+                    Expression child = convertToIcebergExpr(compoundPredicate.getChild(0), schema);
                     if (child != null) {
                         return Expressions.not(child);
                     }
@@ -290,6 +290,8 @@ public class IcebergUtils {
                     return null;
                 }
                 String colName = slotRef.getColumnName();
+                Types.NestedField nestedField = schema.caseInsensitiveFindField(colName);
+                colName = nestedField.name();
                 Object value = extractDorisLiteral(literalExpr);
                 if (value == null) {
                     if (opCode == TExprOpcode.EQ_FOR_NULL && literalExpr instanceof NullLiteral) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
index 4afc4305bd..e4271d161f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/IcebergScanNode.java
@@ -19,12 +19,10 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.catalog.IcebergProperty;
 import org.apache.doris.catalog.IcebergTable;
 import org.apache.doris.common.UserException;
-import org.apache.doris.external.iceberg.util.IcebergUtils;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TBrokerFileStatus;
@@ -37,7 +35,6 @@ import org.apache.logging.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
 
 public class IcebergScanNode extends BrokerScanNode {
     private static final Logger LOG = LogManager.getLogger(IcebergScanNode.class);
@@ -74,28 +71,7 @@ public class IcebergScanNode extends BrokerScanNode {
 
     @Override
     protected void getFileStatus() throws UserException {
-        // extract iceberg conjuncts
-        ListIterator<Expr> it = conjuncts.listIterator();
-        while (it.hasNext()) {
-            Expression expression = IcebergUtils.convertToIcebergExpr(it.next());
-            if (expression != null) {
-                icebergPredicates.add(expression);
-            }
-        }
-        // get iceberg file status
-        List<TBrokerFileStatus> fileStatuses;
-        try {
-            fileStatuses = icebergTable.getIcebergDataFiles(icebergPredicates);
-        } catch (Exception e) {
-            LOG.warn("errors while load iceberg table {} data files.", icebergTable.getName(), e);
-            throw new UserException("errors while load Iceberg table ["
-                    + icebergTable.getName() + "] data files.");
-        }
-        fileStatusesList.add(fileStatuses);
-        filesAdded += fileStatuses.size();
-        for (TBrokerFileStatus fstatus : fileStatuses) {
-            LOG.debug("Add file status is {}", fstatus);
-        }
+        throw new UserException("IcebergScanNode is deprecated");
     }
 
     @Override
@@ -110,3 +86,4 @@ public class IcebergScanNode extends BrokerScanNode {
         return output.toString();
     }
 }
+
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index ca68f755d5..5c211c34f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -124,14 +124,14 @@ public class IcebergScanProvider extends HiveScanProvider {
     @Override
     public List<InputSplit> getSplits(List<Expr> exprs) throws UserException {
         List<Expression> expressions = new ArrayList<>();
+        org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
         for (Expr conjunct : exprs) {
-            Expression expression = IcebergUtils.convertToIcebergExpr(conjunct);
+            Expression expression = IcebergUtils.convertToIcebergExpr(conjunct, table.schema());
             if (expression != null) {
                 expressions.add(expression);
             }
         }
 
-        org.apache.iceberg.Table table = HiveMetaStoreClientHelper.getIcebergTable(hmsTable);
         TableScan scan = table.newScan();
         for (Expression predicate : expressions) {
             scan = scan.filter(predicate);
diff --git a/regression-test/data/external_table_emr_p2/hive/test_upper_case_column_name.out b/regression-test/data/external_table_emr_p2/hive/test_upper_case_column_name.out
new file mode 100644
index 0000000000..1b39ef2771
--- /dev/null
+++ b/regression-test/data/external_table_emr_p2/hive/test_upper_case_column_name.out
@@ -0,0 +1,89 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !hiveParquet1 --
+1	name
+
+-- !hiveParquet2 --
+1	name
+
+-- !hiveParquet3 --
+
+-- !hiveParquet4 --
+1	name
+
+-- !hiveParquet5 --
+
+-- !hiveParquet6 --
+1
+
+-- !hiveParquet7 --
+name
+
+-- !hiveParquet8 --
+1	name
+
+-- !hiveOrc1 --
+1	name
+
+-- !hiveOrc2 --
+1	name
+
+-- !hiveOrc3 --
+
+-- !hiveOrc4 --
+1	name
+
+-- !hiveOrc5 --
+
+-- !hiveOrc6 --
+1
+
+-- !hiveOrc7 --
+name
+
+-- !hiveOrc8 --
+1	name
+
+-- !icebergParquet1 --
+1	name
+
+-- !icebergParquet2 --
+1	name
+
+-- !icebergParquet3 --
+
+-- !icebergParquet4 --
+1	name
+
+-- !icebergParquet5 --
+
+-- !icebergParquet6 --
+1
+
+-- !icebergParquet7 --
+name
+
+-- !icebergParquet8 --
+1	name
+
+-- !icebergOrc1 --
+1	name
+
+-- !icebergOrc2 --
+1	name
+
+-- !icebergOrc3 --
+
+-- !icebergOrc4 --
+1	name
+
+-- !icebergOrc5 --
+
+-- !icebergOrc6 --
+1
+
+-- !icebergOrc7 --
+name
+
+-- !icebergOrc8 --
+1	name
+
diff --git a/regression-test/suites/external_table_emr_p2/hive/test_upper_case_column_name.groovy b/regression-test/suites/external_table_emr_p2/hive/test_upper_case_column_name.groovy
new file mode 100644
index 0000000000..30e78a0512
--- /dev/null
+++ b/regression-test/suites/external_table_emr_p2/hive/test_upper_case_column_name.groovy
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("upper_case_column_name", "p2") {
+    def hiveParquet1 = """select * from hive_upper_case_parquet;"""
+    def hiveParquet2 = """select * from hive_upper_case_parquet where id=1;"""
+    def hiveParquet3 = """select * from hive_upper_case_parquet where id>1;"""
+    def hiveParquet4 = """select * from hive_upper_case_parquet where name='name';"""
+    def hiveParquet5 = """select * from hive_upper_case_parquet where name!='name';"""
+    def hiveParquet6 = """select id from hive_upper_case_parquet where id=1;"""
+    def hiveParquet7 = """select name from hive_upper_case_parquet where id=1;"""
+    def hiveParquet8 = """select id, name from hive_upper_case_parquet where id=1;"""
+    def hiveOrc1 = """select * from hive_upper_case_orc;"""
+    def hiveOrc2 = """select * from hive_upper_case_orc where id=1;"""
+    def hiveOrc3 = """select * from hive_upper_case_orc where id>1;"""
+    def hiveOrc4 = """select * from hive_upper_case_orc where name='name';"""
+    def hiveOrc5 = """select * from hive_upper_case_orc where name!='name';"""
+    def hiveOrc6 = """select id from hive_upper_case_orc where id=1;"""
+    def hiveOrc7 = """select name from hive_upper_case_orc where id=1;"""
+    def hiveOrc8 = """select id, name from hive_upper_case_orc where id=1;"""
+    def icebergParquet1 = """select * from iceberg_upper_case_parquet;"""
+    def icebergParquet2 = """select * from iceberg_upper_case_parquet where id=1;"""
+    def icebergParquet3 = """select * from iceberg_upper_case_parquet where id>1;"""
+    def icebergParquet4 = """select * from iceberg_upper_case_parquet where name='name';"""
+    def icebergParquet5 = """select * from iceberg_upper_case_parquet where name!='name';"""
+    def icebergParquet6 = """select id from iceberg_upper_case_parquet where id=1;"""
+    def icebergParquet7 = """select name from iceberg_upper_case_parquet where id=1;"""
+    def icebergParquet8 = """select id, name from iceberg_upper_case_parquet where id=1;"""
+    def icebergOrc1 = """select * from iceberg_upper_case_orc;"""
+    def icebergOrc2 = """select * from iceberg_upper_case_orc where id=1;"""
+    def icebergOrc3 = """select * from iceberg_upper_case_orc where id>1;"""
+    def icebergOrc4 = """select * from iceberg_upper_case_orc where name='name';"""
+    def icebergOrc5 = """select * from iceberg_upper_case_orc where name!='name';"""
+    def icebergOrc6 = """select id from iceberg_upper_case_orc where id=1;"""
+    def icebergOrc7 = """select name from iceberg_upper_case_orc where id=1;"""
+    def icebergOrc8 = """select id, name from iceberg_upper_case_orc where id=1;"""
+
+
+    String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        String extHiveHmsHost = context.config.otherConfigs.get("extHiveHmsHost")
+        String extHiveHmsPort = context.config.otherConfigs.get("extHiveHmsPort")
+        String catalog_name = "upper_case"
+        sql """drop catalog if exists ${catalog_name};"""
+        sql """
+            create catalog if not exists ${catalog_name} properties (
+                'type'='hms',
+                'hive.metastore.uris' = 'thrift://${extHiveHmsHost}:${extHiveHmsPort}'
+            );
+        """
+        logger.info("catalog " + catalog_name + " created")
+        sql """switch ${catalog_name};"""
+        logger.info("switched to catalog " + catalog_name)
+        sql """use multi_catalog;"""
+        qt_hiveParquet1 hiveParquet1
+        qt_hiveParquet2 hiveParquet2
+        qt_hiveParquet3 hiveParquet3
+        qt_hiveParquet4 hiveParquet4
+        qt_hiveParquet5 hiveParquet5
+        qt_hiveParquet6 hiveParquet6
+        qt_hiveParquet7 hiveParquet7
+        qt_hiveParquet8 hiveParquet8
+        qt_hiveOrc1 hiveOrc1
+        qt_hiveOrc2 hiveOrc2
+        qt_hiveOrc3 hiveOrc3
+        qt_hiveOrc4 hiveOrc4
+        qt_hiveOrc5 hiveOrc5
+        qt_hiveOrc6 hiveOrc6
+        qt_hiveOrc7 hiveOrc7
+        qt_hiveOrc8 hiveOrc8
+        qt_icebergParquet1 icebergParquet1
+        qt_icebergParquet2 icebergParquet2
+        qt_icebergParquet3 icebergParquet3
+        qt_icebergParquet4 icebergParquet4
+        qt_icebergParquet5 icebergParquet5
+        qt_icebergParquet6 icebergParquet6
+        qt_icebergParquet7 icebergParquet7
+        qt_icebergParquet8 icebergParquet8
+        qt_icebergOrc1 icebergOrc1
+        qt_icebergOrc2 icebergOrc2
+        qt_icebergOrc3 icebergOrc3
+        qt_icebergOrc4 icebergOrc4
+        qt_icebergOrc5 icebergOrc5
+        qt_icebergOrc6 icebergOrc6
+        qt_icebergOrc7 icebergOrc7
+        qt_icebergOrc8 icebergOrc8
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org