You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/13 00:55:23 UTC

[doris] branch master updated: [Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 73ad885e19 [Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)
73ad885e19 is described below

commit 73ad885e19ef9d134d22c2c222f991fb6167aec3
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Tue Jun 13 08:55:16 2023 +0800

    [Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)
    
    After supporting insert-only transactional hive full acid tables #19518, #19419, this PR support transactional hive full acid tables.
    
    Support hive3 transactional hive full acid tables.
    Hive2 transactional hive full acid tables need to run major compactions.
---
 be/src/service/internal_service.cpp                |   3 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 137 ++++++++++-----
 be/src/vec/exec/format/orc/vorc_reader.h           |  28 ++-
 .../format/table/transactional_hive_common.cpp     |  72 ++++++++
 .../exec/format/table/transactional_hive_common.h  |  54 ++++++
 .../format/table/transactional_hive_reader.cpp     | 191 +++++++++++++++++++++
 .../exec/format/table/transactional_hive_reader.h  | 144 ++++++++++++++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |  25 ++-
 be/src/vec/exprs/vexpr_context.cpp                 |   4 +
 .../hive/scripts/create_preinstalled_table.hql     |  39 +++++
 .../doris/catalog/external/HMSExternalTable.java   |  25 +--
 .../org/apache/doris/common/util/BrokerUtil.java   |   9 +-
 .../org/apache/doris/datasource/hive/AcidInfo.java | 114 ++++++++++++
 .../doris/datasource/hive/HiveMetaStoreCache.java  |  61 ++++++-
 .../doris/datasource/hive/HiveTransaction.java     |   9 +-
 .../datasource/hive/PooledHiveMetaStoreClient.java | 167 ++++++++++++++----
 .../doris/planner/external/FileQueryScanNode.java  |  32 +++-
 .../doris/planner/external/FileScanNode.java       |  16 +-
 .../apache/doris/planner/external/FileSplit.java   |  11 ++
 .../doris/planner/external/HiveScanNode.java       |   8 +-
 .../apache/doris/planner/external/HiveSplit.java   |  69 ++++++++
 .../{TableFormatType.java => SplitCreator.java}    |  18 +-
 .../doris/planner/external/TableFormatType.java    |   3 +-
 gensrc/thrift/PlanNodes.thrift                     |  11 ++
 .../hive/test_transactional_hive.out               |  33 ++++
 .../hive/test_transactional_hive.groovy            |  61 +++++++
 26 files changed, 1205 insertions(+), 139 deletions(-)

diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 502d897a76..c3ed7313e9 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -576,8 +576,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
             break;
         }
         case TFileFormatType::FORMAT_ORC: {
-            std::vector<std::string> column_names;
-            reader = vectorized::OrcReader::create_unique(params, range, column_names, "", &io_ctx);
+            reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx);
             break;
         }
         case TFileFormatType::FORMAT_JSON: {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index c3d80b6422..9fd4bc57d9 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -69,6 +69,7 @@
 #include "vec/data_types/data_type_map.h"
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/table/transactional_hive_common.h"
 #include "vec/exprs/vbloom_predicate.h"
 #include "vec/exprs/vin_predicate.h"
 #include "vec/exprs/vruntimefilter_wrapper.h"
@@ -86,14 +87,6 @@ enum class FileCachePolicy : uint8_t;
 
 namespace doris::vectorized {
 
-static const char* ACID_EVENT_FIELD_NAMES[] = {"operation", "originalTransaction", "bucket",
-                                               "rowId",     "currentTransaction",  "row"};
-
-static const char* ACID_EVENT_FIELD_NAMES_LOWER_CASE[] = {
-        "operation", "originaltransaction", "bucket", "rowid", "currenttransaction", "row"};
-
-static const int ACID_ROW_OFFSET = 5;
-
 #define FOR_FLAT_ORC_COLUMNS(M)                            \
     M(TypeIndex::Int8, Int8, orc::LongVectorBatch)         \
     M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch)       \
@@ -133,8 +126,8 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
 
 OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
                      const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                     const std::vector<std::string>& column_names, size_t batch_size,
-                     const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat)
+                     size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
+                     bool enable_lazy_mat)
         : _profile(profile),
           _state(state),
           _scan_params(params),
@@ -143,7 +136,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
           _range_start_offset(range.start_offset),
           _range_size(range.size),
           _ctz(ctz),
-          _column_names(column_names),
           _is_hive(params.__isset.slot_name_to_schema_pos),
           _io_ctx(io_ctx),
           _enable_lazy_mat(enable_lazy_mat) {
@@ -154,13 +146,11 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
 }
 
 OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
-                     const std::vector<std::string>& column_names, const std::string& ctz,
-                     io::IOContext* io_ctx, bool enable_lazy_mat)
+                     const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat)
         : _profile(nullptr),
           _scan_params(params),
           _scan_range(range),
           _ctz(ctz),
-          _column_names(column_names),
           _is_hive(params.__isset.slot_name_to_schema_pos),
           _file_system(nullptr),
           _io_ctx(io_ctx),
@@ -240,11 +230,14 @@ Status OrcReader::_create_file_reader() {
 }
 
 Status OrcReader::init_reader(
+        const std::vector<std::string>* column_names,
         std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
-        VExprContextSPtrs& conjuncts) {
+        const VExprContextSPtrs& conjuncts, bool is_acid) {
+    _column_names = column_names;
     _colname_to_value_range = colname_to_value_range;
     _text_converter.reset(new TextConverter('\\'));
     _lazy_read_ctx.conjuncts = conjuncts;
+    _is_acid = is_acid;
     SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
     RETURN_IF_ERROR(_create_file_reader());
     RETURN_IF_ERROR(_init_read_columns());
@@ -254,7 +247,7 @@ Status OrcReader::init_reader(
 Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
                                     std::vector<TypeDescriptor>* col_types) {
     RETURN_IF_ERROR(_create_file_reader());
-    auto& root_type = _remove_acid(_reader->getType());
+    auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType();
     for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
         col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
         col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
@@ -264,20 +257,20 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
 
 Status OrcReader::_init_read_columns() {
     auto& root_type = _reader->getType();
-    _is_acid = _check_acid_schema(root_type);
-
     std::vector<std::string> orc_cols;
     std::vector<std::string> orc_cols_lower_case;
-    _init_orc_cols(root_type, orc_cols, orc_cols_lower_case);
+    _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
 
-    for (auto& col_name : _column_names) {
+    for (size_t i = 0; i < _column_names->size(); ++i) {
+        auto& col_name = (*_column_names)[i];
         if (_is_hive) {
             auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
             if (iter != _scan_params.slot_name_to_schema_pos.end()) {
                 int pos = iter->second;
-                if (_is_acid) {
-                    if (ACID_ROW_OFFSET + 1 + pos < orc_cols_lower_case.size()) {
-                        orc_cols_lower_case[ACID_ROW_OFFSET + 1 + pos] = iter->first;
+                if (_is_acid && i < _column_names->size() - TransactionalHive::READ_PARAMS.size()) {
+                    if (TransactionalHive::ROW_OFFSET + 1 + pos < orc_cols_lower_case.size()) {
+                        // shift TransactionalHive::ROW_OFFSET + 1 offset, 1 is row struct col
+                        orc_cols_lower_case[TransactionalHive::ROW_OFFSET + 1 + pos] = iter->first;
                     }
                 } else {
                     if (pos < orc_cols_lower_case.size()) {
@@ -291,9 +284,11 @@ Status OrcReader::_init_read_columns() {
             _missing_cols.emplace_back(col_name);
         } else {
             int pos = std::distance(orc_cols_lower_case.begin(), iter);
-            if (_is_acid) {
-                auto read_col = fmt::format("{}.{}", ACID_EVENT_FIELD_NAMES[ACID_ROW_OFFSET],
-                                            orc_cols[pos]);
+            if (_is_acid && i < _column_names->size() - TransactionalHive::READ_PARAMS.size()) {
+                auto read_col = fmt::format(
+                        "{}.{}",
+                        TransactionalHive::ACID_COLUMN_NAMES[TransactionalHive::ROW_OFFSET],
+                        orc_cols[pos]);
                 _read_cols.emplace_back(read_col);
             } else {
                 _read_cols.emplace_back(orc_cols[pos]);
@@ -311,14 +306,18 @@ Status OrcReader::_init_read_columns() {
 }
 
 void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
-                               std::vector<std::string>& orc_cols_lower_case) {
+                               std::vector<std::string>& orc_cols_lower_case,
+                               std::unordered_map<std::string, const orc::Type*>& type_map) {
     for (int i = 0; i < type.getSubtypeCount(); ++i) {
         orc_cols.emplace_back(type.getFieldName(i));
-        orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&type, i));
+        auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
+        auto filed_name_lower_case_copy = filed_name_lower_case;
+        orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
+        type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i));
         if (_is_acid) {
             const orc::Type* sub_type = type.getSubtype(i);
             if (sub_type->getKind() == orc::TypeKind::STRUCT) {
-                _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case);
+                _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map);
             }
         }
     }
@@ -326,7 +325,7 @@ void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>&
 
 bool OrcReader::_check_acid_schema(const orc::Type& type) {
     if (orc::TypeKind::STRUCT == type.getKind()) {
-        if (type.getSubtypeCount() != std::size(ACID_EVENT_FIELD_NAMES)) {
+        if (type.getSubtypeCount() != TransactionalHive::ACID_COLUMN_NAMES.size()) {
             return false;
         }
         for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
@@ -334,7 +333,7 @@ bool OrcReader::_check_acid_schema(const orc::Type& type) {
             std::string field_name_lower_case = field_name;
             std::transform(field_name.begin(), field_name.end(), field_name_lower_case.begin(),
                            [](unsigned char c) { return std::tolower(c); });
-            if (field_name_lower_case != ACID_EVENT_FIELD_NAMES_LOWER_CASE[i]) {
+            if (field_name_lower_case != TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE[i]) {
                 return false;
             }
         }
@@ -344,7 +343,7 @@ bool OrcReader::_check_acid_schema(const orc::Type& type) {
 
 const orc::Type& OrcReader::_remove_acid(const orc::Type& type) {
     if (_check_acid_schema(type)) {
-        return *(type.getSubtype(ACID_ROW_OFFSET));
+        return *(type.getSubtype(TransactionalHive::ROW_OFFSET));
     } else {
         return type;
     }
@@ -1193,7 +1192,8 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
                 return Status::OK();
             }
         }
-        const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
+        std::vector<orc::ColumnVectorBatch*> batch_vec;
+        _fill_batch_vec(batch_vec, _batch.get(), 0);
         for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
             auto& column_with_type_and_name = block->get_by_name(col_name);
             auto& column_ptr = column_with_type_and_name.column;
@@ -1247,14 +1247,32 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
                 _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
 
+        _build_delete_row_filter(block, rr);
+
+        std::vector<uint32_t> columns_to_filter;
+        int column_to_keep = block->columns();
+        columns_to_filter.resize(column_to_keep);
+        for (uint32_t i = 0; i < column_to_keep; ++i) {
+            columns_to_filter[i] = i;
+        }
         if (!_lazy_read_ctx.conjuncts.empty()) {
-            int column_to_keep = block->columns();
             VExprContextSPtrs filter_conjuncts;
             for (auto& conjunct : _lazy_read_ctx.conjuncts) {
                 filter_conjuncts.push_back(conjunct);
             }
-            RETURN_IF_ERROR(
-                    VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, column_to_keep));
+            std::vector<IColumn::Filter*> filters;
+            if (_delete_rows_filter_ptr) {
+                filters.push_back(_delete_rows_filter_ptr.get());
+            }
+            RETURN_IF_CATCH_EXCEPTION(
+                    RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+                            filter_conjuncts, &filters, block, columns_to_filter, column_to_keep)));
+        } else {
+            if (_delete_rows_filter_ptr) {
+                RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
+                                                                       (*_delete_rows_filter_ptr)));
+            }
+            Block::erase_useless_column(block, column_to_keep);
         }
     }
     return Status::OK();
@@ -1270,12 +1288,48 @@ void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
     }
 }
 
+void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
+    // transactional hive orc delete row
+    if (_delete_rows != nullptr) {
+        _delete_rows_filter_ptr.reset(new IColumn::Filter(rows, 1));
+        auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
+        const ColumnInt64& original_transaction_column =
+                assert_cast<const ColumnInt64&>(*remove_nullable(
+                        block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE)
+                                .column));
+        const ColumnInt32& bucket_id_column = assert_cast<const ColumnInt32&>(
+                *remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column));
+        const ColumnInt64& row_id_column = assert_cast<const ColumnInt64&>(
+                *remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column));
+        for (int i = 0; i < rows; ++i) {
+            Int64 original_transaction = original_transaction_column.get_int(i);
+            Int32 bucket_id = bucket_id_column.get_int(i);
+            Int64 row_id = row_id_column.get_int(i);
+
+            TransactionalHiveReader::AcidRowID transactional_row_id = {original_transaction,
+                                                                       bucket_id, row_id};
+            if (_delete_rows->contains(transactional_row_id)) {
+                _pos_delete_filter_data[i] = 0;
+            }
+        }
+    }
+}
+
 Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
     Block* block = (Block*)arg;
     size_t origin_column_num = block->columns();
 
-    const auto& batch_vec = down_cast<orc::StructVectorBatch*>(&data)->fields;
-    for (auto& col_name : _lazy_read_ctx.predicate_columns.first) {
+    std::vector<orc::ColumnVectorBatch*> batch_vec;
+    _fill_batch_vec(batch_vec, &data, 0);
+    std::vector<string> col_names;
+    col_names.insert(col_names.end(), _lazy_read_ctx.predicate_columns.first.begin(),
+                     _lazy_read_ctx.predicate_columns.first.end());
+    if (_is_acid) {
+        col_names.insert(col_names.end(),
+                         TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
+                         TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
+    }
+    for (auto& col_name : col_names) {
         auto& column_with_type_and_name = block->get_by_name(col_name);
         auto& column_ptr = column_with_type_and_name.column;
         auto& column_type = column_with_type_and_name.type;
@@ -1295,6 +1349,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
         _lazy_read_ctx.resize_first_column = true;
     }
 
+    // transactional hive orc delete row
+    _build_delete_row_filter(block, size);
+
     _filter.reset(new IColumn::Filter(size, 1));
     auto* __restrict result_filter_data = _filter->data();
     bool can_filter_all = false;
@@ -1310,8 +1367,8 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
     }
 
     if (can_filter_all) {
-        for (auto& col : _lazy_read_ctx.predicate_columns.first) {
-            // clean block to read predicate columns
+        for (auto& col : col_names) {
+            // clean block to read predicate columns and acid columns
             block->get_by_name(col).column->assume_mutable()->clear();
         }
         for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 1230b782f8..2a62bfb93c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -49,6 +49,7 @@
 #include "vec/data_types/data_type_nullable.h"
 #include "vec/exec/format/format_common.h"
 #include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/table/transactional_hive_reader.h"
 
 namespace doris {
 class RuntimeState;
@@ -131,19 +132,18 @@ public:
     };
 
     OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
-              const TFileRangeDesc& range, const std::vector<std::string>& column_names,
-              size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
-              bool enable_lazy_mat = true);
+              const TFileRangeDesc& range, size_t batch_size, const std::string& ctz,
+              io::IOContext* io_ctx, bool enable_lazy_mat = true);
 
     OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
-              const std::vector<std::string>& column_names, const std::string& ctz,
-              io::IOContext* io_ctx, bool enable_lazy_mat = true);
+              const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);
 
     ~OrcReader() override;
 
     Status init_reader(
+            const std::vector<std::string>* column_names,
             std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
-            VExprContextSPtrs& conjuncts);
+            const VExprContextSPtrs& conjuncts, bool is_acid);
 
     Status set_fill_columns(
             const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
@@ -165,6 +165,8 @@ public:
     void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
                          orc::ColumnVectorBatch* batch, int idx);
 
+    void _build_delete_row_filter(const Block* block, size_t rows);
+
     void close();
 
     int64_t size() const;
@@ -178,6 +180,10 @@ public:
 
     Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg);
 
+    void set_delete_rows(const TransactionalHiveReader::AcidRowIDSet* delete_rows) {
+        _delete_rows = delete_rows;
+    }
+
 private:
     struct OrcProfile {
         RuntimeProfile::Counter* read_time;
@@ -211,7 +217,8 @@ private:
     void _init_profile();
     Status _init_read_columns();
     void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
-                        std::vector<std::string>& orc_cols_lower_case);
+                        std::vector<std::string>& orc_cols_lower_case,
+                        std::unordered_map<std::string, const orc::Type*>& type_map);
     static bool _check_acid_schema(const orc::Type& type);
     static const orc::Type& _remove_acid(const orc::Type& type);
     TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
@@ -423,7 +430,7 @@ private:
     int64_t _range_start_offset;
     int64_t _range_size;
     const std::string& _ctz;
-    const std::vector<std::string>& _column_names;
+    const std::vector<std::string>* _column_names;
     cctz::time_zone _time_zone;
 
     std::list<std::string> _read_cols;
@@ -437,6 +444,7 @@ private:
     // Flag for hive engine. True if the external table engine is Hive.
     bool _is_hive = false;
     std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
+    std::unordered_map<std::string, const orc::Type*> _type_map;
     std::vector<const orc::Type*> _col_orc_type;
     std::unique_ptr<ORCFileInputStream> _file_input_stream;
     Statistics _statistics;
@@ -459,10 +467,12 @@ private:
     size_t _decimal_scale_params_index;
 
     std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+    bool _is_acid = false;
     std::unique_ptr<IColumn::Filter> _filter = nullptr;
     LazyReadContext _lazy_read_ctx;
     std::unique_ptr<TextConverter> _text_converter = nullptr;
-    bool _is_acid = false;
+    const TransactionalHiveReader::AcidRowIDSet* _delete_rows = nullptr;
+    std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr;
 };
 
 class ORCFileInputStream : public orc::InputStream {
diff --git a/be/src/vec/exec/format/table/transactional_hive_common.cpp b/be/src/vec/exec/format/table/transactional_hive_common.cpp
new file mode 100644
index 0000000000..85e279031c
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_common.cpp
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transactional_hive_common.h"
+
+namespace doris::vectorized {
+
+const std::string TransactionalHive::OPERATION = "operation";
+const std::string TransactionalHive::ORIGINAL_TRANSACTION = "originalTransaction";
+const std::string TransactionalHive::BUCKET = "bucket";
+const std::string TransactionalHive::ROW_ID = "rowId";
+const std::string TransactionalHive::CURRENT_TRANSACTION = "currentTransaction";
+const std::string TransactionalHive::ROW = "row";
+
+const std::string TransactionalHive::OPERATION_LOWER_CASE = "operation";
+const std::string TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE = "originaltransaction";
+const std::string TransactionalHive::BUCKET_LOWER_CASE = "bucket";
+const std::string TransactionalHive::ROW_ID_LOWER_CASE = "rowid";
+const std::string TransactionalHive::CURRENT_TRANSACTION_LOWER_CASE = "currenttransaction";
+const std::string TransactionalHive::ROW_LOWER_CASE = "row";
+
+const int TransactionalHive::ROW_OFFSET = 5;
+
+const std::vector<TransactionalHive::Param> TransactionalHive::DELETE_ROW_PARAMS = {
+        {TransactionalHive::ORIGINAL_TRANSACTION,
+         TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE, PrimitiveType::TYPE_BIGINT},
+        {TransactionalHive::BUCKET, TransactionalHive::BUCKET_LOWER_CASE, PrimitiveType::TYPE_INT},
+        {TransactionalHive::ROW_ID, TransactionalHive::ROW_ID_LOWER_CASE,
+         PrimitiveType::TYPE_BIGINT}};
+const std::vector<TransactionalHive::Param> TransactionalHive::READ_PARAMS = {
+        {TransactionalHive::ORIGINAL_TRANSACTION,
+         TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE, PrimitiveType::TYPE_BIGINT},
+        {TransactionalHive::BUCKET, TransactionalHive::BUCKET_LOWER_CASE, PrimitiveType::TYPE_INT},
+        {TransactionalHive::ROW_ID, TransactionalHive::ROW_ID_LOWER_CASE,
+         PrimitiveType::TYPE_BIGINT}};
+const std::vector<std::string> TransactionalHive::DELETE_ROW_COLUMN_NAMES = {
+        DELETE_ROW_PARAMS[0].column_name, DELETE_ROW_PARAMS[1].column_name,
+        DELETE_ROW_PARAMS[2].column_name};
+
+const std::vector<std::string> TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE = {
+        DELETE_ROW_PARAMS[0].column_lower_case, DELETE_ROW_PARAMS[1].column_lower_case,
+        DELETE_ROW_PARAMS[2].column_lower_case};
+
+const std::vector<std::string> TransactionalHive::READ_ROW_COLUMN_NAMES = {
+        READ_PARAMS[0].column_name, READ_PARAMS[1].column_name, READ_PARAMS[2].column_name};
+
+const std::vector<std::string> TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE = {
+        READ_PARAMS[0].column_lower_case, READ_PARAMS[1].column_lower_case,
+        READ_PARAMS[2].column_lower_case};
+
+const std::vector<std::string> TransactionalHive::ACID_COLUMN_NAMES = {
+        OPERATION, ORIGINAL_TRANSACTION, BUCKET, ROW_ID, CURRENT_TRANSACTION, ROW};
+
+const std::vector<std::string> TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE = {
+        OPERATION_LOWER_CASE, ORIGINAL_TRANSACTION_LOWER_CASE, BUCKET_LOWER_CASE,
+        ROW_ID_LOWER_CASE,    CURRENT_TRANSACTION_LOWER_CASE,  ROW_LOWER_CASE};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_common.h b/be/src/vec/exec/format/table/transactional_hive_common.h
new file mode 100644
index 0000000000..368af350cf
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_common.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "runtime/define_primitive_type.h"
+
+namespace doris::vectorized {
+struct TransactionalHive {
+    static const std::string OPERATION;
+    static const std::string ORIGINAL_TRANSACTION;
+    static const std::string BUCKET;
+    static const std::string ROW_ID;
+    static const std::string CURRENT_TRANSACTION;
+    static const std::string ROW;
+    static const std::string OPERATION_LOWER_CASE;
+    static const std::string ORIGINAL_TRANSACTION_LOWER_CASE;
+    static const std::string BUCKET_LOWER_CASE;
+    static const std::string ROW_ID_LOWER_CASE;
+    static const std::string CURRENT_TRANSACTION_LOWER_CASE;
+    static const std::string ROW_LOWER_CASE;
+    static const int ROW_OFFSET;
+    struct Param {
+        const std::string column_name;
+        const std::string column_lower_case;
+        const PrimitiveType type;
+    };
+    static const std::vector<Param> DELETE_ROW_PARAMS;
+    static const std::vector<Param> READ_PARAMS;
+    static const std::vector<std::string> DELETE_ROW_COLUMN_NAMES;
+    static const std::vector<std::string> DELETE_ROW_COLUMN_NAMES_LOWER_CASE;
+    static const std::vector<std::string> READ_ROW_COLUMN_NAMES;
+    static const std::vector<std::string> READ_ROW_COLUMN_NAMES_LOWER_CASE;
+    static const std::vector<std::string> ACID_COLUMN_NAMES;
+    static const std::vector<std::string> ACID_COLUMN_NAMES_LOWER_CASE;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
new file mode 100644
index 0000000000..a2ae767d56
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transactional_hive_reader.h"
+
+#include "runtime/runtime_state.h"
+#include "transactional_hive_common.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/format/orc/vorc_reader.h"
+
+namespace doris {
+
+namespace io {
+class IOContext;
+} // namespace io
+namespace vectorized {
+class VExprContext;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
+                                                 RuntimeProfile* profile, RuntimeState* state,
+                                                 const TFileScanRangeParams& params,
+                                                 const TFileRangeDesc& range, io::IOContext* io_ctx)
+        : TableFormatReader(std::move(file_format_reader)),
+          _profile(profile),
+          _state(state),
+          _params(params),
+          _range(range),
+          _io_ctx(io_ctx) {
+    static const char* transactional_hive_profile = "TransactionalHiveProfile";
+    ADD_TIMER(_profile, transactional_hive_profile);
+    _transactional_orc_profile.num_delete_files =
+            ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, transactional_hive_profile);
+    _transactional_orc_profile.num_delete_rows =
+            ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, transactional_hive_profile);
+    _transactional_orc_profile.delete_files_read_time =
+            ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", transactional_hive_profile);
+}
+
+Status TransactionalHiveReader::init_reader(
+        const std::vector<std::string>& column_names,
+        std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+        const VExprContextSPtrs& conjuncts) {
+    OrcReader* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
+    _col_names.insert(_col_names.end(), column_names.begin(), column_names.end());
+    _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
+                      TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
+    Status status = orc_reader->init_reader(&_col_names, colname_to_value_range, conjuncts, true);
+    return status;
+}
+
+Status TransactionalHiveReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+    for (int i = 0; i < TransactionalHive::READ_PARAMS.size(); ++i) {
+        DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+                TypeDescriptor(TransactionalHive::READ_PARAMS[i].type), false);
+        MutableColumnPtr data_column = data_type->create_column();
+        block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+                                            TransactionalHive::READ_PARAMS[i].column_lower_case));
+    }
+    auto res = _file_format_reader->get_next_block(block, read_rows, eof);
+    Block::erase_useless_column(block, block->columns() - TransactionalHive::READ_PARAMS.size());
+    return res;
+}
+
+Status TransactionalHiveReader::set_fill_columns(
+        const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+                partition_columns,
+        const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
+    return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
+bool TransactionalHiveReader::fill_all_columns() const {
+    return _file_format_reader->fill_all_columns();
+};
+
+Status TransactionalHiveReader::get_columns(
+        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+        std::unordered_set<std::string>* missing_cols) {
+    return _file_format_reader->get_columns(name_to_type, missing_cols);
+}
+
+Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) {
+    std::string data_file_path = _range.path;
+    // the path in _range is remove the namenode prefix,
+    // and the file_path in delete file is full path, so we should add it back.
+    if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+        std::string fs_name = _params.hdfs_params.fs_name;
+        if (!starts_with(data_file_path, fs_name)) {
+            data_file_path = fs_name + data_file_path;
+        }
+    }
+
+    OrcReader* orc_reader = (OrcReader*)(_file_format_reader.get());
+    std::vector<std::string> delete_file_col_names;
+    int64_t num_delete_rows = 0;
+    int64_t num_delete_files = 0;
+    std::filesystem::path file_path(data_file_path);
+
+    SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
+    for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) {
+        const std::string file_name = file_path.filename().string();
+        auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(),
+                              file_name);
+        if (iter == delete_delta.file_names.end()) {
+            continue;
+        }
+        auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name);
+
+        TFileRangeDesc delete_range;
+        delete_range.path = delete_file;
+        delete_range.start_offset = 0;
+        delete_range.size = -1;
+        delete_range.file_size = -1;
+
+        OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE,
+                                _state->timezone(), _io_ctx, false);
+
+        RETURN_IF_ERROR(delete_reader.init_reader(
+                &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, nullptr, {}, false));
+
+        std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+                partition_columns;
+        std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+        delete_reader.set_fill_columns(partition_columns, missing_columns);
+
+        bool eof = false;
+        while (!eof) {
+            Block block;
+            for (int i = 0; i < TransactionalHive::DELETE_ROW_PARAMS.size(); ++i) {
+                DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+                        TransactionalHive::DELETE_ROW_PARAMS[i].type, false);
+                MutableColumnPtr data_column = data_type->create_column();
+                block.insert(ColumnWithTypeAndName(
+                        std::move(data_column), data_type,
+                        TransactionalHive::DELETE_ROW_PARAMS[i].column_lower_case));
+            }
+            eof = false;
+            size_t read_rows = 0;
+            RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+            if (read_rows > 0) {
+                static int ORIGINAL_TRANSACTION_INDEX = 0;
+                static int BUCKET_ID_INDEX = 1;
+                static int ROW_ID_INDEX = 2;
+                const ColumnInt64& original_transaction_column = assert_cast<const ColumnInt64&>(
+                        *block.get_by_position(ORIGINAL_TRANSACTION_INDEX).column);
+                const ColumnInt32& bucket_id_column = assert_cast<const ColumnInt32&>(
+                        *block.get_by_position(BUCKET_ID_INDEX).column);
+                const ColumnInt64& row_id_column = assert_cast<const ColumnInt64&>(
+                        *block.get_by_position(ROW_ID_INDEX).column);
+
+                DCHECK_EQ(original_transaction_column.size(), read_rows);
+                DCHECK_EQ(bucket_id_column.size(), read_rows);
+                DCHECK_EQ(row_id_column.size(), read_rows);
+
+                for (int i = 0; i < read_rows; ++i) {
+                    Int64 original_transaction = original_transaction_column.get_int(i);
+                    Int32 bucket_id = bucket_id_column.get_int(i);
+                    Int64 row_id = row_id_column.get_int(i);
+                    AcidRowID delete_row_id = {original_transaction, bucket_id, row_id};
+                    _delete_rows.insert(delete_row_id);
+                    ++num_delete_rows;
+                }
+            }
+        }
+        ++num_delete_files;
+    }
+    if (num_delete_rows > 0) {
+        orc_reader->set_delete_rows(&_delete_rows);
+        COUNTER_UPDATE(_transactional_orc_profile.num_delete_files, num_delete_files);
+        COUNTER_UPDATE(_transactional_orc_profile.num_delete_rows, num_delete_rows);
+    }
+    return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h
new file mode 100644
index 0000000000..b19102a366
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "table_format_reader.h"
+#include "util/runtime_profile.h"
+#include "vec/columns/column_dictionary.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
+
+namespace doris {
+class RuntimeState;
+class SlotDescriptor;
+class TFileRangeDesc;
+class TFileScanRangeParams;
+
+namespace io {
+class IOContext;
+} // namespace io
+struct TypeDescriptor;
+
+namespace vectorized {
+class Block;
+class GenericReader;
+class ShardedKVCache;
+class VExprContext;
+
+class TransactionalHiveReader : public TableFormatReader {
+    ENABLE_FACTORY_CREATOR(TransactionalHiveReader);
+
+public:
+    struct AcidRowID {
+        int64_t original_transaction;
+        int32_t bucket;
+        int64_t row_id;
+
+        struct Hash {
+            size_t operator()(const AcidRowID& transactional_row_id) const {
+                size_t hash_value = 0;
+                hash_value ^= std::hash<int64_t> {}(transactional_row_id.original_transaction) +
+                              0x9e3779b9 + (hash_value << 6) + (hash_value >> 2);
+                hash_value ^= std::hash<int32_t> {}(transactional_row_id.bucket) + 0x9e3779b9 +
+                              (hash_value << 6) + (hash_value >> 2);
+                hash_value ^= std::hash<int64_t> {}(transactional_row_id.row_id) + 0x9e3779b9 +
+                              (hash_value << 6) + (hash_value >> 2);
+                return hash_value;
+            }
+        };
+
+        struct Eq {
+            bool operator()(const AcidRowID& lhs, const AcidRowID& rhs) const {
+                return lhs.original_transaction == rhs.original_transaction &&
+                       lhs.bucket == rhs.bucket && lhs.row_id == rhs.row_id;
+            }
+        };
+    };
+
+    using AcidRowIDSet = vectorized::flat_hash_set<AcidRowID, AcidRowID::Hash, AcidRowID::Eq>;
+
+    TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
+                            RuntimeProfile* profile, RuntimeState* state,
+                            const TFileScanRangeParams& params, const TFileRangeDesc& range,
+                            io::IOContext* io_ctx);
+    ~TransactionalHiveReader() override = default;
+
+    Status init_row_filters(const TFileRangeDesc& range) override;
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+    Status set_fill_columns(
+            const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+                    partition_columns,
+            const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;
+
+    bool fill_all_columns() const override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            const std::vector<std::string>& column_names,
+            std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+            const VExprContextSPtrs& conjuncts);
+
+private:
+    struct TransactionalHiveProfile {
+        RuntimeProfile::Counter* num_delete_files;
+        RuntimeProfile::Counter* num_delete_rows;
+        RuntimeProfile::Counter* delete_files_read_time;
+    };
+
+    RuntimeProfile* _profile;
+    RuntimeState* _state;
+    const TFileScanRangeParams& _params;
+    const TFileRangeDesc& _range;
+    TransactionalHiveProfile _transactional_orc_profile;
+    AcidRowIDSet _delete_rows;
+    std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr;
+    std::vector<std::string> _col_names;
+
+    io::IOContext* _io_ctx;
+};
+
+inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs,
+                      const TransactionalHiveReader::AcidRowID& rhs) {
+    if (lhs.original_transaction != rhs.original_transaction) {
+        return lhs.original_transaction < rhs.original_transaction;
+    } else if (lhs.bucket != rhs.bucket) {
+        return lhs.bucket < rhs.bucket;
+    } else if (lhs.row_id != rhs.row_id) {
+        return lhs.row_id < rhs.row_id;
+    } else {
+        return false;
+    }
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 8772d14802..bca5cc550d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
 #include "vec/exec/format/orc/vorc_reader.h"
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/format/table/transactional_hive_reader.h"
 #include "vec/exec/scan/hudi_jni_reader.h"
 #include "vec/exec/scan/max_compute_jni_reader.h"
 #include "vec/exec/scan/new_file_scan_node.h"
@@ -651,6 +652,9 @@ Status VFileScanner::_get_next_reader() {
             break;
         }
         case TFileFormatType::FORMAT_ORC: {
+            std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
+                    _profile, _state, _params, range, _state->query_options().batch_size,
+                    _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
             if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
                 _push_down_conjuncts.resize(_conjuncts.size());
                 for (size_t i = 0; i != _conjuncts.size(); ++i) {
@@ -658,12 +662,21 @@ Status VFileScanner::_get_next_reader() {
                 }
                 _discard_conjuncts();
             }
-            _cur_reader = OrcReader::create_unique(
-                    _profile, _state, _params, range, _file_col_names,
-                    _state->query_options().batch_size, _state->timezone(), _io_ctx.get(),
-                    _state->query_options().enable_orc_lazy_mat);
-            init_status = ((OrcReader*)(_cur_reader.get()))
-                                  ->init_reader(_colname_to_value_range, _push_down_conjuncts);
+            if (range.__isset.table_format_params &&
+                range.table_format_params.table_format_type == "transactional_hive") {
+                std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+                        TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
+                                                               _state, _params, range,
+                                                               _io_ctx.get());
+                init_status = tran_orc_reader->init_reader(_file_col_names, _colname_to_value_range,
+                                                           _push_down_conjuncts);
+                RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
+                _cur_reader = std::move(tran_orc_reader);
+            } else {
+                init_status = orc_reader->init_reader(&_file_col_names, _colname_to_value_range,
+                                                      _push_down_conjuncts, false);
+                _cur_reader = std::move(orc_reader);
+            }
             break;
         }
         case TFileFormatType::FORMAT_CSV_PLAIN:
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 88796ecc24..50d17ff531 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -221,6 +221,10 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
             for (size_t i = 0; i < size; ++i) {
                 result_filter_data[i] &= filter_data[i];
             }
+            if (memchr(result_filter_data, 0x1, size) == nullptr) {
+                *can_filter_all = true;
+                return Status::OK();
+            }
         }
     }
     return Status::OK();
diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
index 4c6108e9d4..69c58d0ac8 100644
--- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
+++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
@@ -621,4 +621,43 @@ insert into `schema_evo_test_orc` select 1, "kaka";
 alter table `schema_evo_test_orc` ADD COLUMNS (`ts` timestamp);
 insert into `schema_evo_test_orc` select 2, "messi", from_unixtime(to_unix_timestamp('20230101 13:01:03','yyyyMMdd HH:mm:ss'));
 
+-- Currently docker is hive 2.x version. Hive 2.x versioned full-acid tables need to run major compaction.
+SET hive.support.concurrency=true;
+SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+create table orc_full_acid (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+
+insert into orc_full_acid values
+(1, 'A'),
+(2, 'B'),
+(3, 'C');
+
+update orc_full_acid set value = 'CC' where id = 3;
+
+alter table orc_full_acid compact 'major';
+
+create table orc_full_acid_par (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+
+insert into orc_full_acid_par PARTITION(part_col=20230101) values
+(1, 'A'),
+(2, 'B'),
+(3, 'C');
+
+insert into orc_full_acid_par PARTITION(part_col=20230102) values
+(4, 'D'),
+(5, 'E'),
+(6, 'F');
+
+update orc_full_acid_par set value = 'BB' where id = 2;
+
+alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
+alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';
+
 show tables;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 98c941e987..64e06783c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -164,27 +164,6 @@ public class HMSExternalTable extends ExternalTable {
      * Support managed_table and external_table.
      */
     private boolean supportedHiveTable() {
-        boolean isTxnTbl = AcidUtils.isTransactionalTable(remoteTable);
-        if (isTxnTbl) {
-            // Only support "insert_only" transactional table
-            // There are 2 types of parameter:
-            //  "transactional_properties" = "insert_only",
-            //  or,
-            //  "insert_only" = "true"
-            // And must check "insert_only" first, because "transactional_properties" may be "default"
-            Map<String, String> parameters = remoteTable.getParameters();
-            if (parameters.containsKey(TBL_PROP_INSERT_ONLY)) {
-                if (!parameters.get(TBL_PROP_INSERT_ONLY).equalsIgnoreCase("true")) {
-                    return false;
-                }
-            } else if (parameters.containsKey(TBL_PROP_TXN_PROPERTIES)) {
-                if (!parameters.get(TBL_PROP_TXN_PROPERTIES).equalsIgnoreCase(TBL_PROP_INSERT_ONLY)) {
-                    return false;
-                }
-            } else {
-                return false;
-            }
-        }
         String inputFileFormat = remoteTable.getSd().getInputFormat();
         boolean supportedFileFormat = inputFileFormat != null && SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
         LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat);
@@ -215,6 +194,10 @@ public class HMSExternalTable extends ExternalTable {
         return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable);
     }
 
+    public boolean isFullAcidTable() {
+        return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable);
+    }
+
     @Override
     public boolean isView() {
         makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index d19693e9f4..60f454999e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -114,17 +114,20 @@ public class BrokerUtil {
 
     public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath)
             throws UserException {
-        return parseColumnsFromPath(filePath, columnsFromPath, true);
+        return parseColumnsFromPath(filePath, columnsFromPath, true, false);
     }
 
     public static List<String> parseColumnsFromPath(
             String filePath,
             List<String> columnsFromPath,
-            boolean caseSensitive)
+            boolean caseSensitive,
+            boolean isACID)
             throws UserException {
         if (columnsFromPath == null || columnsFromPath.isEmpty()) {
             return Collections.emptyList();
         }
+        // if it is ACID, the path count is 3. The hdfs path is hdfs://xxx/table_name/par=xxx/delta(or base)_xxx/.
+        int pathCount = isACID ? 3 : 2;
         if (!caseSensitive) {
             for (int i = 0; i < columnsFromPath.size(); i++) {
                 String path = columnsFromPath.remove(i);
@@ -138,7 +141,7 @@ public class BrokerUtil {
         }
         String[] columns = new String[columnsFromPath.size()];
         int size = 0;
-        for (int i = strings.length - 2; i >= 0; i--) {
+        for (int i = strings.length - pathCount; i >= 0; i--) {
             String str = strings[i];
             if (str != null && str.isEmpty()) {
                 continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java
new file mode 100644
index 0000000000..c49855fbd1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Stores information about Acid properties of a partition.
+ */
+public class AcidInfo {
+    private final String partitionLocation;
+    private final List<DeleteDeltaInfo> deleteDeltas;
+
+    public AcidInfo(String partitionLocation, List<DeleteDeltaInfo> deleteDeltas) {
+        this.partitionLocation = partitionLocation;
+        this.deleteDeltas = deleteDeltas;
+    }
+
+    public String getPartitionLocation() {
+        return partitionLocation;
+    }
+
+    public List<DeleteDeltaInfo> getDeleteDeltas() {
+        return deleteDeltas;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        AcidInfo acidInfo = (AcidInfo) o;
+        return Objects.equals(partitionLocation, acidInfo.partitionLocation) && Objects.equals(
+                deleteDeltas, acidInfo.deleteDeltas);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(partitionLocation, deleteDeltas);
+    }
+
+    @Override
+    public String toString() {
+        final StringBuilder sb = new StringBuilder("AcidInfo{");
+        sb.append("partitionLocation='").append(partitionLocation).append('\'');
+        sb.append(", deleteDeltas=").append(deleteDeltas);
+        sb.append('}');
+        return sb.toString();
+    }
+
+    public static class DeleteDeltaInfo {
+        private String directoryLocation;
+        private List<String> fileNames;
+
+        public DeleteDeltaInfo(String location, List<String> filenames) {
+            this.directoryLocation = location;
+            this.fileNames = filenames;
+        }
+
+        public String getDirectoryLocation() {
+            return directoryLocation;
+        }
+
+        public List<String> getFileNames() {
+            return fileNames;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            DeleteDeltaInfo that = (DeleteDeltaInfo) o;
+            return Objects.equals(directoryLocation, that.directoryLocation)
+                    && Objects.equals(fileNames, that.fileNames);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(directoryLocation, fileNames);
+        }
+
+        @Override
+        public String toString() {
+            final StringBuilder sb = new StringBuilder("DeleteDeltaInfo{");
+            sb.append("directoryLocation='").append(directoryLocation).append('\'');
+            sb.append(", fileNames=").append(fileNames);
+            sb.append('}');
+            return sb.toString();
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 25ec9055ed..cbc8631883 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -18,6 +18,8 @@
 package org.apache.doris.datasource.hive;
 
 import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.backup.Status;
+import org.apache.doris.backup.Status.ErrCode;
 import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.catalog.ListPartitionItem;
 import org.apache.doris.catalog.PartitionItem;
@@ -30,6 +32,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.S3Util;
 import org.apache.doris.datasource.CacheException;
 import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.external.hive.util.HiveUtil;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.RemoteFiles;
@@ -78,6 +81,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.URLDecoder;
 import java.nio.charset.StandardCharsets;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -101,6 +105,8 @@ public class HiveMetaStoreCache {
     // After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
     public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";
 
+    private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
+
     private HMSExternalCatalog catalog;
 
     private Executor executor;
@@ -662,7 +668,8 @@ public class HiveMetaStoreCache {
         return fileCacheRef;
     }
 
-    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds) {
+    public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
+            boolean isFullAcid) {
         List<FileCacheValue> fileCacheValues = Lists.newArrayList();
         JobConf jobConf = getJobConf();
         String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -682,12 +689,47 @@ public class HiveMetaStoreCache {
                     throw new Exception("Original non-ACID files in transactional tables are not supported");
                 }
 
+                if (isFullAcid) {
+                    int acidVersion = 2;
+                    /**
+                     * From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version'
+                     * with value >= '2'.
+                     */
+                    Path baseOrDeltaPath = directory.getBaseDirectory() != null ? directory.getBaseDirectory() :
+                            !directory.getCurrentDirectories().isEmpty() ? directory.getCurrentDirectories().get(0)
+                                    .getPath() : null;
+                    String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
+                    RemoteFileSystem fs = FileSystemFactory.getByLocation(baseOrDeltaPath.toUri().toString(), jobConf);
+                    Status status = fs.exists(acidVersionPath);
+                    if (status != Status.OK) {
+                        if (status.getErrCode() == ErrCode.NOT_FOUND) {
+                            acidVersion = 0;
+                        } else {
+                            throw new Exception(String.format("Failed to check remote path {} exists.",
+                                    acidVersionPath));
+                        }
+                    }
+                    if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
+                        throw new Exception(
+                                "Hive 2.x versioned full-acid tables need to run major compaction.");
+                    }
+                }
+
                 // delta directories
+                List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
                 for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
                     String location = delta.getPath().toString();
                     RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
                     RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
-                    locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
+                    if (delta.isDeleteDelta()) {
+                        List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
+                                        name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+                                        .collect(Collectors.toList());
+                        deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
+                        continue;
+                    }
+                    locatedFiles.files().stream().filter(
+                            f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
                             .forEach(fileCacheValue::addFile);
                 }
 
@@ -696,9 +738,11 @@ public class HiveMetaStoreCache {
                     String location = directory.getBaseDirectory().toString();
                     RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
                     RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
-                    locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
+                    locatedFiles.files().stream().filter(
+                            f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
                             .forEach(fileCacheValue::addFile);
                 }
+                fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
                 fileCacheValues.add(fileCacheValue);
             }
         } catch (Exception e) {
@@ -854,6 +898,8 @@ public class HiveMetaStoreCache {
         // partitionValues would be ["part1", "part2"]
         protected List<String> partitionValues;
 
+        private AcidInfo acidInfo;
+
         public void addFile(RemoteFile file) {
             if (files == null) {
                 files = Lists.newArrayList();
@@ -877,6 +923,15 @@ public class HiveMetaStoreCache {
         public int getValuesSize() {
             return partitionValues == null ? 0 : partitionValues.size();
         }
+
+
+        public AcidInfo getAcidInfo() {
+            return acidInfo;
+        }
+
+        public void setAcidInfo(AcidInfo acidInfo) {
+            this.acidInfo = acidInfo;
+        }
     }
 
     @Data
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
index 7919d451a2..3f3c2f91d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
@@ -37,15 +37,18 @@ public class HiveTransaction {
     private final String user;
     private final HMSExternalTable hiveTable;
 
+    private final boolean isFullAcid;
+
     private long txnId;
     private List<String> partitionNames = Lists.newArrayList();
 
     ValidWriteIdList validWriteIdList = null;
 
-    public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable) {
+    public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) {
         this.queryId = queryId;
         this.user = user;
         this.hiveTable = hiveTable;
+        this.isFullAcid = isFullAcid;
     }
 
     public String getQueryId() {
@@ -56,6 +59,10 @@ public class HiveTransaction {
         this.partitionNames.add(partitionName);
     }
 
+    public boolean isFullAcid() {
+        return isFullAcid;
+    }
+
     public ValidWriteIdList getValidWriteIds(PooledHiveMetaStoreClient client) {
         if (validWriteIdList == null) {
             TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
index e26a4c9284..9f9a1a457f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
@@ -27,6 +27,8 @@ import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
 import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
@@ -85,7 +88,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<String> getAllDatabases() {
         try (CachedClient client = getClient()) {
-            return client.client.getAllDatabases();
+            try {
+                return client.client.getAllDatabases();
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get all database from hms client", e);
         }
@@ -93,7 +101,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<String> getAllTables(String dbName) {
         try (CachedClient client = getClient()) {
-            return client.client.getAllTables(dbName);
+            try {
+                return client.client.getAllTables(dbName);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get all tables for db %s", e, dbName);
         }
@@ -101,7 +114,12 @@ public class PooledHiveMetaStoreClient {
 
     public boolean tableExists(String dbName, String tblName) {
         try (CachedClient client = getClient()) {
-            return client.client.tableExists(dbName, tblName);
+            try {
+                return client.client.tableExists(dbName, tblName);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName);
         }
@@ -109,7 +127,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<String> listPartitionNames(String dbName, String tblName) {
         try (CachedClient client = getClient()) {
-            return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
+            try {
+                return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to list partition names for table %s in db %s", e, tblName, dbName);
         }
@@ -117,7 +140,12 @@ public class PooledHiveMetaStoreClient {
 
     public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
         try (CachedClient client = getClient()) {
-            return client.client.getPartition(dbName, tblName, partitionValues);
+            try {
+                return client.client.getPartition(dbName, tblName, partitionValues);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName,
                     dbName, partitionValues);
@@ -126,7 +154,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<Partition> getPartitionsByFilter(String dbName, String tblName, String filter) {
         try (CachedClient client = getClient()) {
-            return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM);
+            try {
+                return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get partition by filter for table %s in db %s", e, tblName,
                     dbName);
@@ -135,7 +168,12 @@ public class PooledHiveMetaStoreClient {
 
     public Table getTable(String dbName, String tblName) {
         try (CachedClient client = getClient()) {
-            return client.client.getTable(dbName, tblName);
+            try {
+                return client.client.getTable(dbName, tblName);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get table %s in db %s from hms client", e, tblName, dbName);
         }
@@ -143,7 +181,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<FieldSchema> getSchema(String dbName, String tblName) {
         try (CachedClient client = getClient()) {
-            return client.client.getSchema(dbName, tblName);
+            try {
+                return client.client.getSchema(dbName, tblName);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new HMSClientException("failed to get schema for table %s in db %s", e, tblName, dbName);
         }
@@ -151,7 +194,12 @@ public class PooledHiveMetaStoreClient {
 
     public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
         try (CachedClient client = getClient()) {
-            return client.client.getTableColumnStatistics(dbName, tblName, columns);
+            try {
+                return client.client.getTableColumnStatistics(dbName, tblName, columns);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -160,7 +208,12 @@ public class PooledHiveMetaStoreClient {
     public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
             String dbName, String tblName, List<String> partNames, List<String> columns) {
         try (CachedClient client = getClient()) {
-            return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
+            try {
+                return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
@@ -168,7 +221,12 @@ public class PooledHiveMetaStoreClient {
 
     public CurrentNotificationEventId getCurrentNotificationEventId() {
         try (CachedClient client = getClient()) {
-            return client.client.getCurrentNotificationEventId();
+            try {
+                return client.client.getCurrentNotificationEventId();
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             LOG.warn("Failed to fetch current notification event id", e);
             throw new MetastoreNotificationFetchException(
@@ -181,7 +239,12 @@ public class PooledHiveMetaStoreClient {
             IMetaStoreClient.NotificationFilter filter)
             throws MetastoreNotificationFetchException {
         try (CachedClient client = getClient()) {
-            return client.client.getNextNotification(lastEventId, maxEvents, filter);
+            try {
+                return client.client.getNextNotification(lastEventId, maxEvents, filter);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
             throw new MetastoreNotificationFetchException(
@@ -192,7 +255,12 @@ public class PooledHiveMetaStoreClient {
 
     public long openTxn(String user) {
         try (CachedClient client = getClient()) {
-            return client.client.openTxn(user);
+            try {
+                return client.client.openTxn(user);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to open transaction", e);
         }
@@ -200,7 +268,12 @@ public class PooledHiveMetaStoreClient {
 
     public void commitTxn(long txnId) {
         try (CachedClient client = getClient()) {
-            client.client.commitTxn(txnId);
+            try {
+                client.client.commitTxn(txnId);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to commit transaction " + txnId, e);
         }
@@ -214,7 +287,13 @@ public class PooledHiveMetaStoreClient {
             request.addLockComponent(component);
         }
         try (CachedClient client = getClient()) {
-            LockResponse response = client.client.lock(request.build());
+            LockResponse response;
+            try {
+                response = client.client.lock(request.build());
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
             long start = System.currentTimeMillis();
             while (response.getState() == LockState.WAITING) {
                 long lockId = response.getLockid();
@@ -236,28 +315,40 @@ public class PooledHiveMetaStoreClient {
 
     public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
         try (CachedClient client = getClient()) {
-            // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
-            // Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
-            // deletes delta directories for valid transactions that existed at the time transaction is opened
-            ValidTxnList validTransactions = client.client.getValidTxns();
-            List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
-                    Collections.singletonList(fullTableName), validTransactions.toString());
-            if (tableValidWriteIdsList.size() != 1) {
-                throw new Exception("tableValidWriteIdsList's size should be 1");
+            try {
+                // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
+                // Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
+                // deletes delta directories for valid transactions that existed at the time transaction is opened
+                ValidTxnList validTransactions = client.client.getValidTxns();
+                List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
+                        Collections.singletonList(fullTableName), validTransactions.toString());
+                if (tableValidWriteIdsList.size() != 1) {
+                    throw new Exception("tableValidWriteIdsList's size should be 1");
+                }
+                ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
+                        tableValidWriteIdsList);
+                ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
+                return writeIdList;
+             } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
             }
-            ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
-                    tableValidWriteIdsList);
-            ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
-            return writeIdList;
-        } catch (Exception e) {
-            throw new RuntimeException(
-                    "failed to get valid write ids for " + fullTableName + ", transaction " + currentTransactionId, e);
+         } catch (Exception e) {
+            // Ignore this exception when the version of hive is not compatible with these apis.
+            // Currently, the workaround is using a max watermark.
+            LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e);
+            return new ValidReaderWriteIdList(fullTableName, new long[0], new BitSet(), Long.MAX_VALUE);
         }
     }
 
     private LockResponse checkLock(long lockId) {
         try (CachedClient client = getClient()) {
-            return client.client.checkLock(lockId);
+            try {
+                return client.client.checkLock(lockId);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to check lock " + lockId, e);
         }
@@ -289,7 +380,12 @@ public class PooledHiveMetaStoreClient {
 
     public void heartbeatForTxn(long txnId) {
         try (CachedClient client = getClient()) {
-            client.client.heartbeat(txnId, txnId);
+            try {
+                client.client.heartbeat(txnId, txnId);
+            } catch (Exception e) {
+                client.setThrowable(e);
+                throw e;
+            }
         } catch (Exception e) {
             throw new RuntimeException("failed to do heartbeat for transaction " + txnId, e);
         }
@@ -297,6 +393,7 @@ public class PooledHiveMetaStoreClient {
 
     private class CachedClient implements AutoCloseable {
         private final IMetaStoreClient client;
+        private volatile Throwable throwable;
 
         private CachedClient(HiveConf hiveConf) throws MetaException {
             String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
@@ -312,10 +409,14 @@ public class PooledHiveMetaStoreClient {
             }
         }
 
+        public void setThrowable(Throwable throwable) {
+            this.throwable = throwable;
+        }
+
         @Override
         public void close() throws Exception {
             synchronized (clientPool) {
-                if (clientPool.size() > poolSize) {
+                if (throwable != null || clientPool.size() > poolSize) {
                     client.close();
                 } else {
                     clientPool.offer(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 3d5a08e15a..498401b05b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -34,6 +34,8 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.S3Util;
 import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.hive.AcidInfo;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.external.hudi.HudiScanNode;
@@ -60,6 +62,9 @@ import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TScanRange;
 import org.apache.doris.thrift.TScanRangeLocation;
 import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc;
+import org.apache.doris.thrift.TTransactionalHiveDesc;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -68,6 +73,7 @@ import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -274,11 +280,35 @@ public abstract class FileQueryScanNode extends FileScanNode {
 
             // If fileSplit has partition values, use the values collected from hive partitions.
             // Otherwise, use the values in file path.
+            boolean isACID = false;
+            if (fileSplit instanceof HiveSplit) {
+                HiveSplit hiveSplit = (HiveSplit) split;
+                isACID = hiveSplit.isACID();
+            }
             List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
-                    ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false)
+                    ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID)
                     : fileSplit.getPartitionValues();
 
             TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
+            if (isACID) {
+                HiveSplit hiveSplit = (HiveSplit) split;
+                hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+                TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+                tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+                AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+                TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
+                transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+                List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
+                for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
+                    TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
+                    deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+                    deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+                    deleteDeltaDescs.add(deleteDeltaDesc);
+                }
+                transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+                tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+                rangeDesc.setTableFormatParams(tableFormatFileDesc);
+            }
             // external data lake table
             if (fileSplit instanceof IcebergSplit) {
                 // TODO: extract all data lake split to factory
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 694e97a7d9..9a050cc139 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileSplit.FileSplitCreator;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -203,6 +204,13 @@ public abstract class FileScanNode extends ExternalScanNode {
 
     protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
             long modificationTime, boolean splittable, List<String> partitionValues) throws IOException {
+        return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues,
+                FileSplitCreator.DEFAULT);
+    }
+
+    protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
+            long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
+            throws IOException {
         if (blockLocations == null) {
             blockLocations = new BlockLocation[0];
         }
@@ -216,7 +224,7 @@ public abstract class FileScanNode extends ExternalScanNode {
         if (!splittable) {
             LOG.debug("Path {} is not splittable.", path);
             String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
-            result.add(new FileSplit(path, 0, length, length, modificationTime, hosts, partitionValues));
+            result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
             return result;
         }
         long bytesRemaining;
@@ -224,13 +232,13 @@ public abstract class FileScanNode extends ExternalScanNode {
                 bytesRemaining -= splitSize) {
             int location = getBlockIndex(blockLocations, length - bytesRemaining);
             String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
-            result.add(new FileSplit(path, length - bytesRemaining, splitSize,
+            result.add(splitCreator.create(path, length - bytesRemaining, splitSize,
                     length, modificationTime, hosts, partitionValues));
         }
         if (bytesRemaining != 0L) {
             int location = getBlockIndex(blockLocations, length - bytesRemaining);
             String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
-            result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
+            result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
                     length, modificationTime, hosts, partitionValues));
         }
 
@@ -238,7 +246,7 @@ public abstract class FileScanNode extends ExternalScanNode {
         return result;
     }
 
-    private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+    protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
         if (blkLocations == null || blkLocations.length == 0) {
             return -1;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 21b88f9c2b..4e9d0bae56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -66,4 +66,15 @@ public class FileSplit implements Split {
     public Object getInfo() {
         return null;
     }
+
+    public static class FileSplitCreator implements SplitCreator {
+
+        public static final FileSplitCreator DEFAULT = new FileSplitCreator();
+
+        @Override
+        public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
+                List<String> partitionValues) {
+            return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 79daa2df32..b8ac376307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.HivePartition;
 import org.apache.doris.datasource.hive.HiveTransaction;
 import org.apache.doris.planner.ListPartitionPrunerV2;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.HiveSplit.HiveSplitCreator;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -104,7 +105,7 @@ public class HiveScanNode extends FileQueryScanNode {
 
         if (hmsTable.isHiveTransactionalTable()) {
             this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
-                    ConnectContext.get().getQualifiedUser(), hmsTable);
+                    ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
             Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
         }
     }
@@ -192,7 +193,8 @@ public class HiveScanNode extends FileQueryScanNode {
                 for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
                     allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
                             status.getBlockLocations(), status.getLength(), status.getModificationTime(),
-                            isSplittable, fileCacheValue.getPartitionValues()));
+                            isSplittable, fileCacheValue.getPartitionValues(),
+                            new HiveSplitCreator(fileCacheValue.getAcidInfo())));
                 }
             }
         }
@@ -208,7 +210,7 @@ public class HiveScanNode extends FileQueryScanNode {
         }
         ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
                 ((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
-        return cache.getFilesByTransaction(partitions, validWriteIds);
+        return cache.getFilesByTransaction(partitions, validWriteIds, hiveTransaction.isFullAcid());
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
new file mode 100644
index 0000000000..0f230c85f4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.datasource.hive.AcidInfo;
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class HiveSplit extends FileSplit {
+
+    public HiveSplit(Path path, long start, long length, long fileLength,
+            long modificationTime, String[] hosts, List<String> partitionValues, AcidInfo acidInfo) {
+        super(path, start, length, fileLength, modificationTime, hosts, partitionValues);
+        this.acidInfo = acidInfo;
+    }
+
+    public HiveSplit(Path path, long start, long length, long fileLength, String[] hosts, AcidInfo acidInfo) {
+        super(path, start, length, fileLength, hosts, null);
+        this.acidInfo = acidInfo;
+    }
+
+    @Override
+    public Object getInfo() {
+        return acidInfo;
+    }
+
+    private AcidInfo acidInfo;
+
+    public boolean isACID() {
+        return acidInfo != null;
+    }
+
+    public static class HiveSplitCreator implements SplitCreator {
+
+        private AcidInfo acidInfo;
+
+        public HiveSplitCreator(AcidInfo acidInfo) {
+            this.acidInfo = acidInfo;
+        }
+
+        public HiveSplitCreator() {
+            this(null);
+        }
+
+        @Override
+        public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
+                List<String> partitionValues) {
+            return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
similarity index 72%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
index f97c8ea1ec..ff9d4dd92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
@@ -17,19 +17,13 @@
 
 package org.apache.doris.planner.external;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon");
+import org.apache.doris.spi.Split;
 
-    private final String tableFormatType;
+import org.apache.hadoop.fs.Path;
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
+import java.util.List;
 
-    public String value() {
-        return tableFormatType;
-    }
+public interface SplitCreator {
+    Split create(Path path, long start, long length, long fileLength,
+            long modificationTime, String[] hosts, List<String> partitionValues);
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index f97c8ea1ec..891e138db6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -21,7 +21,8 @@ public enum TableFormatType {
     HIVE("hive"),
     ICEBERG("iceberg"),
     HUDI("hudi"),
-    PAIMON("paimon");
+    PAIMON("paimon"),
+    TRANSACTIONAL_HIVE("transactional_hive");
 
     private final String tableFormatType;
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f3015f18cb..4844d2ed7f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -314,11 +314,22 @@ struct THudiFileDesc {
     10: optional list<string> nested_fields;
 }
 
+struct TTransactionalHiveDeleteDeltaDesc {
+    1: optional string directory_location
+    2: optional list<string> file_names
+}
+
+struct TTransactionalHiveDesc {
+    1: optional string partition
+    2: optional list<TTransactionalHiveDeleteDeltaDesc> delete_deltas
+}
+
 struct TTableFormatFileDesc {
     1: optional string table_format_type
     2: optional TIcebergFileDesc iceberg_params
     3: optional THudiFileDesc hudi_params
     4: optional TPaimonFileDesc paimon_params
+    5: optional TTransactionalHiveDesc transactional_hive_params
 }
 
 struct TFileScanRangeParams {
diff --git a/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out b/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out
new file mode 100644
index 0000000000..e4c6a6c6d2
--- /dev/null
+++ b/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !q01 --
+1	A
+2	B
+3	CC
+
+-- !q02 --
+A
+B
+CC
+
+-- !q03 --
+3	CC
+
+-- !q01 --
+1	A	20230101
+2	BB	20230101
+3	C	20230101
+4	D	20230102
+5	E	20230102
+6	F	20230102
+
+-- !q02 --
+A
+BB
+C
+D
+E
+F
+
+-- !q03 --
+2	BB	20230101
+
diff --git a/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy
new file mode 100644
index 0000000000..d15f49749f
--- /dev/null
+++ b/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_transactional_hive", "p0") {
+    def q01 = {
+        qt_q01 """
+        select * from orc_full_acid order by id;
+        """
+        qt_q02 """
+        select value from orc_full_acid order by id;
+        """
+        qt_q03 """
+        select * from orc_full_acid where value = 'CC' order by id;
+        """
+    }
+
+    def q01_par = {
+        qt_q01 """
+        select * from orc_full_acid_par order by id;
+        """
+        qt_q02 """
+        select value from orc_full_acid_par order by id;
+        """
+        qt_q03 """
+        select * from orc_full_acid_par where value = 'BB' order by id;
+        """
+    }
+    String enabled = context.config.otherConfigs.get("enableHiveTest")
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+        try {
+            String hms_port = context.config.otherConfigs.get("hms_port")
+            String catalog_name = "test_transactional_hive"
+            sql """drop catalog if exists ${catalog_name}"""
+            sql """create catalog if not exists ${catalog_name} properties (
+                "type"="hms",
+                'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}'
+            );"""
+            sql """use `${catalog_name}`.`default`"""
+
+            q01()
+            q01_par()
+
+            sql """drop catalog if exists ${catalog_name}"""
+        } finally {
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org