You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/13 00:55:23 UTC
[doris] branch master updated: [Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73ad885e19 [Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)
73ad885e19 is described below
commit 73ad885e19ef9d134d22c2c222f991fb6167aec3
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Tue Jun 13 08:55:16 2023 +0800
[Feature][Fix](multi-catalog) Implements transactional hive full acid tables. (#20679)
After supporting insert-only transactional hive full acid tables #19518, #19419, this PR support transactional hive full acid tables.
Support hive3 transactional hive full acid tables.
Hive2 transactional hive full acid tables need to run major compactions.
---
be/src/service/internal_service.cpp | 3 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 137 ++++++++++-----
be/src/vec/exec/format/orc/vorc_reader.h | 28 ++-
.../format/table/transactional_hive_common.cpp | 72 ++++++++
.../exec/format/table/transactional_hive_common.h | 54 ++++++
.../format/table/transactional_hive_reader.cpp | 191 +++++++++++++++++++++
.../exec/format/table/transactional_hive_reader.h | 144 ++++++++++++++++
be/src/vec/exec/scan/vfile_scanner.cpp | 25 ++-
be/src/vec/exprs/vexpr_context.cpp | 4 +
.../hive/scripts/create_preinstalled_table.hql | 39 +++++
.../doris/catalog/external/HMSExternalTable.java | 25 +--
.../org/apache/doris/common/util/BrokerUtil.java | 9 +-
.../org/apache/doris/datasource/hive/AcidInfo.java | 114 ++++++++++++
.../doris/datasource/hive/HiveMetaStoreCache.java | 61 ++++++-
.../doris/datasource/hive/HiveTransaction.java | 9 +-
.../datasource/hive/PooledHiveMetaStoreClient.java | 167 ++++++++++++++----
.../doris/planner/external/FileQueryScanNode.java | 32 +++-
.../doris/planner/external/FileScanNode.java | 16 +-
.../apache/doris/planner/external/FileSplit.java | 11 ++
.../doris/planner/external/HiveScanNode.java | 8 +-
.../apache/doris/planner/external/HiveSplit.java | 69 ++++++++
.../{TableFormatType.java => SplitCreator.java} | 18 +-
.../doris/planner/external/TableFormatType.java | 3 +-
gensrc/thrift/PlanNodes.thrift | 11 ++
.../hive/test_transactional_hive.out | 33 ++++
.../hive/test_transactional_hive.groovy | 61 +++++++
26 files changed, 1205 insertions(+), 139 deletions(-)
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 502d897a76..c3ed7313e9 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -576,8 +576,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
break;
}
case TFileFormatType::FORMAT_ORC: {
- std::vector<std::string> column_names;
- reader = vectorized::OrcReader::create_unique(params, range, column_names, "", &io_ctx);
+ reader = vectorized::OrcReader::create_unique(params, range, "", &io_ctx);
break;
}
case TFileFormatType::FORMAT_JSON: {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index c3d80b6422..9fd4bc57d9 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -69,6 +69,7 @@
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vin_predicate.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
@@ -86,14 +87,6 @@ enum class FileCachePolicy : uint8_t;
namespace doris::vectorized {
-static const char* ACID_EVENT_FIELD_NAMES[] = {"operation", "originalTransaction", "bucket",
- "rowId", "currentTransaction", "row"};
-
-static const char* ACID_EVENT_FIELD_NAMES_LOWER_CASE[] = {
- "operation", "originaltransaction", "bucket", "rowid", "currenttransaction", "row"};
-
-static const int ACID_ROW_OFFSET = 5;
-
#define FOR_FLAT_ORC_COLUMNS(M) \
M(TypeIndex::Int8, Int8, orc::LongVectorBatch) \
M(TypeIndex::UInt8, UInt8, orc::LongVectorBatch) \
@@ -133,8 +126,8 @@ void ORCFileInputStream::read(void* buf, uint64_t length, uint64_t offset) {
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
- const std::vector<std::string>& column_names, size_t batch_size,
- const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat)
+ size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
+ bool enable_lazy_mat)
: _profile(profile),
_state(state),
_scan_params(params),
@@ -143,7 +136,6 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
_range_start_offset(range.start_offset),
_range_size(range.size),
_ctz(ctz),
- _column_names(column_names),
_is_hive(params.__isset.slot_name_to_schema_pos),
_io_ctx(io_ctx),
_enable_lazy_mat(enable_lazy_mat) {
@@ -154,13 +146,11 @@ OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
}
OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
- const std::vector<std::string>& column_names, const std::string& ctz,
- io::IOContext* io_ctx, bool enable_lazy_mat)
+ const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat)
: _profile(nullptr),
_scan_params(params),
_scan_range(range),
_ctz(ctz),
- _column_names(column_names),
_is_hive(params.__isset.slot_name_to_schema_pos),
_file_system(nullptr),
_io_ctx(io_ctx),
@@ -240,11 +230,14 @@ Status OrcReader::_create_file_reader() {
}
Status OrcReader::init_reader(
+ const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- VExprContextSPtrs& conjuncts) {
+ const VExprContextSPtrs& conjuncts, bool is_acid) {
+ _column_names = column_names;
_colname_to_value_range = colname_to_value_range;
_text_converter.reset(new TextConverter('\\'));
_lazy_read_ctx.conjuncts = conjuncts;
+ _is_acid = is_acid;
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_create_file_reader());
RETURN_IF_ERROR(_init_read_columns());
@@ -254,7 +247,7 @@ Status OrcReader::init_reader(
Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) {
RETURN_IF_ERROR(_create_file_reader());
- auto& root_type = _remove_acid(_reader->getType());
+ auto& root_type = _is_acid ? _remove_acid(_reader->getType()) : _reader->getType();
for (int i = 0; i < root_type.getSubtypeCount(); ++i) {
col_names->emplace_back(_get_field_name_lower_case(&root_type, i));
col_types->emplace_back(_convert_to_doris_type(root_type.getSubtype(i)));
@@ -264,20 +257,20 @@ Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
Status OrcReader::_init_read_columns() {
auto& root_type = _reader->getType();
- _is_acid = _check_acid_schema(root_type);
-
std::vector<std::string> orc_cols;
std::vector<std::string> orc_cols_lower_case;
- _init_orc_cols(root_type, orc_cols, orc_cols_lower_case);
+ _init_orc_cols(root_type, orc_cols, orc_cols_lower_case, _type_map);
- for (auto& col_name : _column_names) {
+ for (size_t i = 0; i < _column_names->size(); ++i) {
+ auto& col_name = (*_column_names)[i];
if (_is_hive) {
auto iter = _scan_params.slot_name_to_schema_pos.find(col_name);
if (iter != _scan_params.slot_name_to_schema_pos.end()) {
int pos = iter->second;
- if (_is_acid) {
- if (ACID_ROW_OFFSET + 1 + pos < orc_cols_lower_case.size()) {
- orc_cols_lower_case[ACID_ROW_OFFSET + 1 + pos] = iter->first;
+ if (_is_acid && i < _column_names->size() - TransactionalHive::READ_PARAMS.size()) {
+ if (TransactionalHive::ROW_OFFSET + 1 + pos < orc_cols_lower_case.size()) {
+ // shift TransactionalHive::ROW_OFFSET + 1 offset, 1 is row struct col
+ orc_cols_lower_case[TransactionalHive::ROW_OFFSET + 1 + pos] = iter->first;
}
} else {
if (pos < orc_cols_lower_case.size()) {
@@ -291,9 +284,11 @@ Status OrcReader::_init_read_columns() {
_missing_cols.emplace_back(col_name);
} else {
int pos = std::distance(orc_cols_lower_case.begin(), iter);
- if (_is_acid) {
- auto read_col = fmt::format("{}.{}", ACID_EVENT_FIELD_NAMES[ACID_ROW_OFFSET],
- orc_cols[pos]);
+ if (_is_acid && i < _column_names->size() - TransactionalHive::READ_PARAMS.size()) {
+ auto read_col = fmt::format(
+ "{}.{}",
+ TransactionalHive::ACID_COLUMN_NAMES[TransactionalHive::ROW_OFFSET],
+ orc_cols[pos]);
_read_cols.emplace_back(read_col);
} else {
_read_cols.emplace_back(orc_cols[pos]);
@@ -311,14 +306,18 @@ Status OrcReader::_init_read_columns() {
}
void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
- std::vector<std::string>& orc_cols_lower_case) {
+ std::vector<std::string>& orc_cols_lower_case,
+ std::unordered_map<std::string, const orc::Type*>& type_map) {
for (int i = 0; i < type.getSubtypeCount(); ++i) {
orc_cols.emplace_back(type.getFieldName(i));
- orc_cols_lower_case.emplace_back(_get_field_name_lower_case(&type, i));
+ auto filed_name_lower_case = _get_field_name_lower_case(&type, i);
+ auto filed_name_lower_case_copy = filed_name_lower_case;
+ orc_cols_lower_case.emplace_back(std::move(filed_name_lower_case));
+ type_map.emplace(std::move(filed_name_lower_case_copy), type.getSubtype(i));
if (_is_acid) {
const orc::Type* sub_type = type.getSubtype(i);
if (sub_type->getKind() == orc::TypeKind::STRUCT) {
- _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case);
+ _init_orc_cols(*sub_type, orc_cols, orc_cols_lower_case, type_map);
}
}
}
@@ -326,7 +325,7 @@ void OrcReader::_init_orc_cols(const orc::Type& type, std::vector<std::string>&
bool OrcReader::_check_acid_schema(const orc::Type& type) {
if (orc::TypeKind::STRUCT == type.getKind()) {
- if (type.getSubtypeCount() != std::size(ACID_EVENT_FIELD_NAMES)) {
+ if (type.getSubtypeCount() != TransactionalHive::ACID_COLUMN_NAMES.size()) {
return false;
}
for (uint64_t i = 0; i < type.getSubtypeCount(); ++i) {
@@ -334,7 +333,7 @@ bool OrcReader::_check_acid_schema(const orc::Type& type) {
std::string field_name_lower_case = field_name;
std::transform(field_name.begin(), field_name.end(), field_name_lower_case.begin(),
[](unsigned char c) { return std::tolower(c); });
- if (field_name_lower_case != ACID_EVENT_FIELD_NAMES_LOWER_CASE[i]) {
+ if (field_name_lower_case != TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE[i]) {
return false;
}
}
@@ -344,7 +343,7 @@ bool OrcReader::_check_acid_schema(const orc::Type& type) {
const orc::Type& OrcReader::_remove_acid(const orc::Type& type) {
if (_check_acid_schema(type)) {
- return *(type.getSubtype(ACID_ROW_OFFSET));
+ return *(type.getSubtype(TransactionalHive::ROW_OFFSET));
} else {
return type;
}
@@ -1193,7 +1192,8 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return Status::OK();
}
}
- const auto& batch_vec = down_cast<orc::StructVectorBatch*>(_batch.get())->fields;
+ std::vector<orc::ColumnVectorBatch*> batch_vec;
+ _fill_batch_vec(batch_vec, _batch.get(), 0);
for (auto& col_name : _lazy_read_ctx.lazy_read_columns) {
auto& column_with_type_and_name = block->get_by_name(col_name);
auto& column_ptr = column_with_type_and_name.column;
@@ -1247,14 +1247,32 @@ Status OrcReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
+ _build_delete_row_filter(block, rr);
+
+ std::vector<uint32_t> columns_to_filter;
+ int column_to_keep = block->columns();
+ columns_to_filter.resize(column_to_keep);
+ for (uint32_t i = 0; i < column_to_keep; ++i) {
+ columns_to_filter[i] = i;
+ }
if (!_lazy_read_ctx.conjuncts.empty()) {
- int column_to_keep = block->columns();
VExprContextSPtrs filter_conjuncts;
for (auto& conjunct : _lazy_read_ctx.conjuncts) {
filter_conjuncts.push_back(conjunct);
}
- RETURN_IF_ERROR(
- VExprContext::filter_block(_lazy_read_ctx.conjuncts, block, column_to_keep));
+ std::vector<IColumn::Filter*> filters;
+ if (_delete_rows_filter_ptr) {
+ filters.push_back(_delete_rows_filter_ptr.get());
+ }
+ RETURN_IF_CATCH_EXCEPTION(
+ RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
+ filter_conjuncts, &filters, block, columns_to_filter, column_to_keep)));
+ } else {
+ if (_delete_rows_filter_ptr) {
+ RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
+ (*_delete_rows_filter_ptr)));
+ }
+ Block::erase_useless_column(block, column_to_keep);
}
}
return Status::OK();
@@ -1270,12 +1288,48 @@ void OrcReader::_fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
}
}
+void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
+ // transactional hive orc delete row
+ if (_delete_rows != nullptr) {
+ _delete_rows_filter_ptr.reset(new IColumn::Filter(rows, 1));
+ auto* __restrict _pos_delete_filter_data = _delete_rows_filter_ptr->data();
+ const ColumnInt64& original_transaction_column =
+ assert_cast<const ColumnInt64&>(*remove_nullable(
+ block->get_by_name(TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE)
+ .column));
+ const ColumnInt32& bucket_id_column = assert_cast<const ColumnInt32&>(
+ *remove_nullable(block->get_by_name(TransactionalHive::BUCKET_LOWER_CASE).column));
+ const ColumnInt64& row_id_column = assert_cast<const ColumnInt64&>(
+ *remove_nullable(block->get_by_name(TransactionalHive::ROW_ID_LOWER_CASE).column));
+ for (int i = 0; i < rows; ++i) {
+ Int64 original_transaction = original_transaction_column.get_int(i);
+ Int32 bucket_id = bucket_id_column.get_int(i);
+ Int64 row_id = row_id_column.get_int(i);
+
+ TransactionalHiveReader::AcidRowID transactional_row_id = {original_transaction,
+ bucket_id, row_id};
+ if (_delete_rows->contains(transactional_row_id)) {
+ _pos_delete_filter_data[i] = 0;
+ }
+ }
+ }
+}
+
Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
Block* block = (Block*)arg;
size_t origin_column_num = block->columns();
- const auto& batch_vec = down_cast<orc::StructVectorBatch*>(&data)->fields;
- for (auto& col_name : _lazy_read_ctx.predicate_columns.first) {
+ std::vector<orc::ColumnVectorBatch*> batch_vec;
+ _fill_batch_vec(batch_vec, &data, 0);
+ std::vector<string> col_names;
+ col_names.insert(col_names.end(), _lazy_read_ctx.predicate_columns.first.begin(),
+ _lazy_read_ctx.predicate_columns.first.end());
+ if (_is_acid) {
+ col_names.insert(col_names.end(),
+ TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
+ TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
+ }
+ for (auto& col_name : col_names) {
auto& column_with_type_and_name = block->get_by_name(col_name);
auto& column_ptr = column_with_type_and_name.column;
auto& column_type = column_with_type_and_name.type;
@@ -1295,6 +1349,9 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
_lazy_read_ctx.resize_first_column = true;
}
+ // transactional hive orc delete row
+ _build_delete_row_filter(block, size);
+
_filter.reset(new IColumn::Filter(size, 1));
auto* __restrict result_filter_data = _filter->data();
bool can_filter_all = false;
@@ -1310,8 +1367,8 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
}
if (can_filter_all) {
- for (auto& col : _lazy_read_ctx.predicate_columns.first) {
- // clean block to read predicate columns
+ for (auto& col : col_names) {
+ // clean block to read predicate columns and acid columns
block->get_by_name(col).column->assume_mutable()->clear();
}
for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 1230b782f8..2a62bfb93c 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -49,6 +49,7 @@
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/table/transactional_hive_reader.h"
namespace doris {
class RuntimeState;
@@ -131,19 +132,18 @@ public:
};
OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, const std::vector<std::string>& column_names,
- size_t batch_size, const std::string& ctz, io::IOContext* io_ctx,
- bool enable_lazy_mat = true);
+ const TFileRangeDesc& range, size_t batch_size, const std::string& ctz,
+ io::IOContext* io_ctx, bool enable_lazy_mat = true);
OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
- const std::vector<std::string>& column_names, const std::string& ctz,
- io::IOContext* io_ctx, bool enable_lazy_mat = true);
+ const std::string& ctz, io::IOContext* io_ctx, bool enable_lazy_mat = true);
~OrcReader() override;
Status init_reader(
+ const std::vector<std::string>* column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
- VExprContextSPtrs& conjuncts);
+ const VExprContextSPtrs& conjuncts, bool is_acid);
Status set_fill_columns(
const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
@@ -165,6 +165,8 @@ public:
void _fill_batch_vec(std::vector<orc::ColumnVectorBatch*>& result,
orc::ColumnVectorBatch* batch, int idx);
+ void _build_delete_row_filter(const Block* block, size_t rows);
+
void close();
int64_t size() const;
@@ -178,6 +180,10 @@ public:
Status filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg);
+ void set_delete_rows(const TransactionalHiveReader::AcidRowIDSet* delete_rows) {
+ _delete_rows = delete_rows;
+ }
+
private:
struct OrcProfile {
RuntimeProfile::Counter* read_time;
@@ -211,7 +217,8 @@ private:
void _init_profile();
Status _init_read_columns();
void _init_orc_cols(const orc::Type& type, std::vector<std::string>& orc_cols,
- std::vector<std::string>& orc_cols_lower_case);
+ std::vector<std::string>& orc_cols_lower_case,
+ std::unordered_map<std::string, const orc::Type*>& type_map);
static bool _check_acid_schema(const orc::Type& type);
static const orc::Type& _remove_acid(const orc::Type& type);
TypeDescriptor _convert_to_doris_type(const orc::Type* orc_type);
@@ -423,7 +430,7 @@ private:
int64_t _range_start_offset;
int64_t _range_size;
const std::string& _ctz;
- const std::vector<std::string>& _column_names;
+ const std::vector<std::string>* _column_names;
cctz::time_zone _time_zone;
std::list<std::string> _read_cols;
@@ -437,6 +444,7 @@ private:
// Flag for hive engine. True if the external table engine is Hive.
bool _is_hive = false;
std::unordered_map<std::string, std::string> _col_name_to_file_col_name;
+ std::unordered_map<std::string, const orc::Type*> _type_map;
std::vector<const orc::Type*> _col_orc_type;
std::unique_ptr<ORCFileInputStream> _file_input_stream;
Statistics _statistics;
@@ -459,10 +467,12 @@ private:
size_t _decimal_scale_params_index;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
+ bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter = nullptr;
LazyReadContext _lazy_read_ctx;
std::unique_ptr<TextConverter> _text_converter = nullptr;
- bool _is_acid = false;
+ const TransactionalHiveReader::AcidRowIDSet* _delete_rows = nullptr;
+ std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr;
};
class ORCFileInputStream : public orc::InputStream {
diff --git a/be/src/vec/exec/format/table/transactional_hive_common.cpp b/be/src/vec/exec/format/table/transactional_hive_common.cpp
new file mode 100644
index 0000000000..85e279031c
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_common.cpp
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transactional_hive_common.h"
+
+namespace doris::vectorized {
+
+const std::string TransactionalHive::OPERATION = "operation";
+const std::string TransactionalHive::ORIGINAL_TRANSACTION = "originalTransaction";
+const std::string TransactionalHive::BUCKET = "bucket";
+const std::string TransactionalHive::ROW_ID = "rowId";
+const std::string TransactionalHive::CURRENT_TRANSACTION = "currentTransaction";
+const std::string TransactionalHive::ROW = "row";
+
+const std::string TransactionalHive::OPERATION_LOWER_CASE = "operation";
+const std::string TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE = "originaltransaction";
+const std::string TransactionalHive::BUCKET_LOWER_CASE = "bucket";
+const std::string TransactionalHive::ROW_ID_LOWER_CASE = "rowid";
+const std::string TransactionalHive::CURRENT_TRANSACTION_LOWER_CASE = "currenttransaction";
+const std::string TransactionalHive::ROW_LOWER_CASE = "row";
+
+const int TransactionalHive::ROW_OFFSET = 5;
+
+const std::vector<TransactionalHive::Param> TransactionalHive::DELETE_ROW_PARAMS = {
+ {TransactionalHive::ORIGINAL_TRANSACTION,
+ TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE, PrimitiveType::TYPE_BIGINT},
+ {TransactionalHive::BUCKET, TransactionalHive::BUCKET_LOWER_CASE, PrimitiveType::TYPE_INT},
+ {TransactionalHive::ROW_ID, TransactionalHive::ROW_ID_LOWER_CASE,
+ PrimitiveType::TYPE_BIGINT}};
+const std::vector<TransactionalHive::Param> TransactionalHive::READ_PARAMS = {
+ {TransactionalHive::ORIGINAL_TRANSACTION,
+ TransactionalHive::ORIGINAL_TRANSACTION_LOWER_CASE, PrimitiveType::TYPE_BIGINT},
+ {TransactionalHive::BUCKET, TransactionalHive::BUCKET_LOWER_CASE, PrimitiveType::TYPE_INT},
+ {TransactionalHive::ROW_ID, TransactionalHive::ROW_ID_LOWER_CASE,
+ PrimitiveType::TYPE_BIGINT}};
+const std::vector<std::string> TransactionalHive::DELETE_ROW_COLUMN_NAMES = {
+ DELETE_ROW_PARAMS[0].column_name, DELETE_ROW_PARAMS[1].column_name,
+ DELETE_ROW_PARAMS[2].column_name};
+
+const std::vector<std::string> TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE = {
+ DELETE_ROW_PARAMS[0].column_lower_case, DELETE_ROW_PARAMS[1].column_lower_case,
+ DELETE_ROW_PARAMS[2].column_lower_case};
+
+const std::vector<std::string> TransactionalHive::READ_ROW_COLUMN_NAMES = {
+ READ_PARAMS[0].column_name, READ_PARAMS[1].column_name, READ_PARAMS[2].column_name};
+
+const std::vector<std::string> TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE = {
+ READ_PARAMS[0].column_lower_case, READ_PARAMS[1].column_lower_case,
+ READ_PARAMS[2].column_lower_case};
+
+const std::vector<std::string> TransactionalHive::ACID_COLUMN_NAMES = {
+ OPERATION, ORIGINAL_TRANSACTION, BUCKET, ROW_ID, CURRENT_TRANSACTION, ROW};
+
+const std::vector<std::string> TransactionalHive::ACID_COLUMN_NAMES_LOWER_CASE = {
+ OPERATION_LOWER_CASE, ORIGINAL_TRANSACTION_LOWER_CASE, BUCKET_LOWER_CASE,
+ ROW_ID_LOWER_CASE, CURRENT_TRANSACTION_LOWER_CASE, ROW_LOWER_CASE};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_common.h b/be/src/vec/exec/format/table/transactional_hive_common.h
new file mode 100644
index 0000000000..368af350cf
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_common.h
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <string>
+#include <vector>
+
+#include "runtime/define_primitive_type.h"
+
+namespace doris::vectorized {
+struct TransactionalHive {
+ static const std::string OPERATION;
+ static const std::string ORIGINAL_TRANSACTION;
+ static const std::string BUCKET;
+ static const std::string ROW_ID;
+ static const std::string CURRENT_TRANSACTION;
+ static const std::string ROW;
+ static const std::string OPERATION_LOWER_CASE;
+ static const std::string ORIGINAL_TRANSACTION_LOWER_CASE;
+ static const std::string BUCKET_LOWER_CASE;
+ static const std::string ROW_ID_LOWER_CASE;
+ static const std::string CURRENT_TRANSACTION_LOWER_CASE;
+ static const std::string ROW_LOWER_CASE;
+ static const int ROW_OFFSET;
+ struct Param {
+ const std::string column_name;
+ const std::string column_lower_case;
+ const PrimitiveType type;
+ };
+ static const std::vector<Param> DELETE_ROW_PARAMS;
+ static const std::vector<Param> READ_PARAMS;
+ static const std::vector<std::string> DELETE_ROW_COLUMN_NAMES;
+ static const std::vector<std::string> DELETE_ROW_COLUMN_NAMES_LOWER_CASE;
+ static const std::vector<std::string> READ_ROW_COLUMN_NAMES;
+ static const std::vector<std::string> READ_ROW_COLUMN_NAMES_LOWER_CASE;
+ static const std::vector<std::string> ACID_COLUMN_NAMES;
+ static const std::vector<std::string> ACID_COLUMN_NAMES_LOWER_CASE;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.cpp b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
new file mode 100644
index 0000000000..a2ae767d56
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.cpp
@@ -0,0 +1,191 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "transactional_hive_reader.h"
+
+#include "runtime/runtime_state.h"
+#include "transactional_hive_common.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/format/orc/vorc_reader.h"
+
+namespace doris {
+
+namespace io {
+class IOContext;
+} // namespace io
+namespace vectorized {
+class VExprContext;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+
+TransactionalHiveReader::TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
+ RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, io::IOContext* io_ctx)
+ : TableFormatReader(std::move(file_format_reader)),
+ _profile(profile),
+ _state(state),
+ _params(params),
+ _range(range),
+ _io_ctx(io_ctx) {
+ static const char* transactional_hive_profile = "TransactionalHiveProfile";
+ ADD_TIMER(_profile, transactional_hive_profile);
+ _transactional_orc_profile.num_delete_files =
+ ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, transactional_hive_profile);
+ _transactional_orc_profile.num_delete_rows =
+ ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, transactional_hive_profile);
+ _transactional_orc_profile.delete_files_read_time =
+ ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", transactional_hive_profile);
+}
+
+Status TransactionalHiveReader::init_reader(
+ const std::vector<std::string>& column_names,
+ std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+ const VExprContextSPtrs& conjuncts) {
+ OrcReader* orc_reader = static_cast<OrcReader*>(_file_format_reader.get());
+ _col_names.insert(_col_names.end(), column_names.begin(), column_names.end());
+ _col_names.insert(_col_names.end(), TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.begin(),
+ TransactionalHive::READ_ROW_COLUMN_NAMES_LOWER_CASE.end());
+ Status status = orc_reader->init_reader(&_col_names, colname_to_value_range, conjuncts, true);
+ return status;
+}
+
+Status TransactionalHiveReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
+ for (int i = 0; i < TransactionalHive::READ_PARAMS.size(); ++i) {
+ DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+ TypeDescriptor(TransactionalHive::READ_PARAMS[i].type), false);
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+ TransactionalHive::READ_PARAMS[i].column_lower_case));
+ }
+ auto res = _file_format_reader->get_next_block(block, read_rows, eof);
+ Block::erase_useless_column(block, block->columns() - TransactionalHive::READ_PARAMS.size());
+ return res;
+}
+
+Status TransactionalHiveReader::set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) {
+ return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
+bool TransactionalHiveReader::fill_all_columns() const {
+ return _file_format_reader->fill_all_columns();
+};
+
+Status TransactionalHiveReader::get_columns(
+ std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) {
+ return _file_format_reader->get_columns(name_to_type, missing_cols);
+}
+
+Status TransactionalHiveReader::init_row_filters(const TFileRangeDesc& range) {
+ std::string data_file_path = _range.path;
+ // the path in _range is remove the namenode prefix,
+ // and the file_path in delete file is full path, so we should add it back.
+ if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+ std::string fs_name = _params.hdfs_params.fs_name;
+ if (!starts_with(data_file_path, fs_name)) {
+ data_file_path = fs_name + data_file_path;
+ }
+ }
+
+ OrcReader* orc_reader = (OrcReader*)(_file_format_reader.get());
+ std::vector<std::string> delete_file_col_names;
+ int64_t num_delete_rows = 0;
+ int64_t num_delete_files = 0;
+ std::filesystem::path file_path(data_file_path);
+
+ SCOPED_TIMER(_transactional_orc_profile.delete_files_read_time);
+ for (auto& delete_delta : range.table_format_params.transactional_hive_params.delete_deltas) {
+ const std::string file_name = file_path.filename().string();
+ auto iter = std::find(delete_delta.file_names.begin(), delete_delta.file_names.end(),
+ file_name);
+ if (iter == delete_delta.file_names.end()) {
+ continue;
+ }
+ auto delete_file = fmt::format("{}/{}", delete_delta.directory_location, file_name);
+
+ TFileRangeDesc delete_range;
+ delete_range.path = delete_file;
+ delete_range.start_offset = 0;
+ delete_range.size = -1;
+ delete_range.file_size = -1;
+
+ OrcReader delete_reader(_profile, _state, _params, delete_range, _MIN_BATCH_SIZE,
+ _state->timezone(), _io_ctx, false);
+
+ RETURN_IF_ERROR(delete_reader.init_reader(
+ &TransactionalHive::DELETE_ROW_COLUMN_NAMES_LOWER_CASE, nullptr, {}, false));
+
+ std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ delete_reader.set_fill_columns(partition_columns, missing_columns);
+
+ bool eof = false;
+ while (!eof) {
+ Block block;
+ for (int i = 0; i < TransactionalHive::DELETE_ROW_PARAMS.size(); ++i) {
+ DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+ TransactionalHive::DELETE_ROW_PARAMS[i].type, false);
+ MutableColumnPtr data_column = data_type->create_column();
+ block.insert(ColumnWithTypeAndName(
+ std::move(data_column), data_type,
+ TransactionalHive::DELETE_ROW_PARAMS[i].column_lower_case));
+ }
+ eof = false;
+ size_t read_rows = 0;
+ RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+ if (read_rows > 0) {
+ static int ORIGINAL_TRANSACTION_INDEX = 0;
+ static int BUCKET_ID_INDEX = 1;
+ static int ROW_ID_INDEX = 2;
+ const ColumnInt64& original_transaction_column = assert_cast<const ColumnInt64&>(
+ *block.get_by_position(ORIGINAL_TRANSACTION_INDEX).column);
+ const ColumnInt32& bucket_id_column = assert_cast<const ColumnInt32&>(
+ *block.get_by_position(BUCKET_ID_INDEX).column);
+ const ColumnInt64& row_id_column = assert_cast<const ColumnInt64&>(
+ *block.get_by_position(ROW_ID_INDEX).column);
+
+ DCHECK_EQ(original_transaction_column.size(), read_rows);
+ DCHECK_EQ(bucket_id_column.size(), read_rows);
+ DCHECK_EQ(row_id_column.size(), read_rows);
+
+ for (int i = 0; i < read_rows; ++i) {
+ Int64 original_transaction = original_transaction_column.get_int(i);
+ Int32 bucket_id = bucket_id_column.get_int(i);
+ Int64 row_id = row_id_column.get_int(i);
+ AcidRowID delete_row_id = {original_transaction, bucket_id, row_id};
+ _delete_rows.insert(delete_row_id);
+ ++num_delete_rows;
+ }
+ }
+ }
+ ++num_delete_files;
+ }
+ if (num_delete_rows > 0) {
+ orc_reader->set_delete_rows(&_delete_rows);
+ COUNTER_UPDATE(_transactional_orc_profile.num_delete_files, num_delete_files);
+ COUNTER_UPDATE(_transactional_orc_profile.num_delete_rows, num_delete_rows);
+ }
+ return Status::OK();
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/transactional_hive_reader.h b/be/src/vec/exec/format/table/transactional_hive_reader.h
new file mode 100644
index 0000000000..b19102a366
--- /dev/null
+++ b/be/src/vec/exec/format/table/transactional_hive_reader.h
@@ -0,0 +1,144 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stddef.h>
+#include <stdint.h>
+
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "table_format_reader.h"
+#include "util/runtime_profile.h"
+#include "vec/columns/column_dictionary.h"
+#include "vec/common/hash_table/phmap_fwd_decl.h"
+
+namespace doris {
+class RuntimeState;
+class SlotDescriptor;
+class TFileRangeDesc;
+class TFileScanRangeParams;
+
+namespace io {
+class IOContext;
+} // namespace io
+struct TypeDescriptor;
+
+namespace vectorized {
+class Block;
+class GenericReader;
+class ShardedKVCache;
+class VExprContext;
+
+class TransactionalHiveReader : public TableFormatReader {
+ ENABLE_FACTORY_CREATOR(TransactionalHiveReader);
+
+public:
+ struct AcidRowID {
+ int64_t original_transaction;
+ int32_t bucket;
+ int64_t row_id;
+
+ struct Hash {
+ size_t operator()(const AcidRowID& transactional_row_id) const {
+ size_t hash_value = 0;
+ hash_value ^= std::hash<int64_t> {}(transactional_row_id.original_transaction) +
+ 0x9e3779b9 + (hash_value << 6) + (hash_value >> 2);
+ hash_value ^= std::hash<int32_t> {}(transactional_row_id.bucket) + 0x9e3779b9 +
+ (hash_value << 6) + (hash_value >> 2);
+ hash_value ^= std::hash<int64_t> {}(transactional_row_id.row_id) + 0x9e3779b9 +
+ (hash_value << 6) + (hash_value >> 2);
+ return hash_value;
+ }
+ };
+
+ struct Eq {
+ bool operator()(const AcidRowID& lhs, const AcidRowID& rhs) const {
+ return lhs.original_transaction == rhs.original_transaction &&
+ lhs.bucket == rhs.bucket && lhs.row_id == rhs.row_id;
+ }
+ };
+ };
+
+ using AcidRowIDSet = vectorized::flat_hash_set<AcidRowID, AcidRowID::Hash, AcidRowID::Eq>;
+
+ TransactionalHiveReader(std::unique_ptr<GenericReader> file_format_reader,
+ RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const TFileRangeDesc& range,
+ io::IOContext* io_ctx);
+ ~TransactionalHiveReader() override = default;
+
+ Status init_row_filters(const TFileRangeDesc& range) override;
+
+ Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+
+ Status set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;
+
+ bool fill_all_columns() const override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_reader(
+ const std::vector<std::string>& column_names,
+ std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
+ const VExprContextSPtrs& conjuncts);
+
+private:
+ struct TransactionalHiveProfile {
+ RuntimeProfile::Counter* num_delete_files;
+ RuntimeProfile::Counter* num_delete_rows;
+ RuntimeProfile::Counter* delete_files_read_time;
+ };
+
+ RuntimeProfile* _profile;
+ RuntimeState* _state;
+ const TFileScanRangeParams& _params;
+ const TFileRangeDesc& _range;
+ TransactionalHiveProfile _transactional_orc_profile;
+ AcidRowIDSet _delete_rows;
+ std::unique_ptr<IColumn::Filter> _delete_rows_filter_ptr = nullptr;
+ std::vector<std::string> _col_names;
+
+ io::IOContext* _io_ctx;
+};
+
+inline bool operator<(const TransactionalHiveReader::AcidRowID& lhs,
+ const TransactionalHiveReader::AcidRowID& rhs) {
+ if (lhs.original_transaction != rhs.original_transaction) {
+ return lhs.original_transaction < rhs.original_transaction;
+ } else if (lhs.bucket != rhs.bucket) {
+ return lhs.bucket < rhs.bucket;
+ } else if (lhs.row_id != rhs.row_id) {
+ return lhs.row_id < rhs.row_id;
+ } else {
+ return false;
+ }
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 8772d14802..bca5cc550d 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -60,6 +60,7 @@
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/format/table/transactional_hive_reader.h"
#include "vec/exec/scan/hudi_jni_reader.h"
#include "vec/exec/scan/max_compute_jni_reader.h"
#include "vec/exec/scan/new_file_scan_node.h"
@@ -651,6 +652,9 @@ Status VFileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_ORC: {
+ std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
+ _profile, _state, _params, range, _state->query_options().batch_size,
+ _state->timezone(), _io_ctx.get(), _state->query_options().enable_orc_lazy_mat);
if (!_is_load && _push_down_conjuncts.empty() && !_conjuncts.empty()) {
_push_down_conjuncts.resize(_conjuncts.size());
for (size_t i = 0; i != _conjuncts.size(); ++i) {
@@ -658,12 +662,21 @@ Status VFileScanner::_get_next_reader() {
}
_discard_conjuncts();
}
- _cur_reader = OrcReader::create_unique(
- _profile, _state, _params, range, _file_col_names,
- _state->query_options().batch_size, _state->timezone(), _io_ctx.get(),
- _state->query_options().enable_orc_lazy_mat);
- init_status = ((OrcReader*)(_cur_reader.get()))
- ->init_reader(_colname_to_value_range, _push_down_conjuncts);
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "transactional_hive") {
+ std::unique_ptr<TransactionalHiveReader> tran_orc_reader =
+ TransactionalHiveReader::create_unique(std::move(orc_reader), _profile,
+ _state, _params, range,
+ _io_ctx.get());
+ init_status = tran_orc_reader->init_reader(_file_col_names, _colname_to_value_range,
+ _push_down_conjuncts);
+ RETURN_IF_ERROR(tran_orc_reader->init_row_filters(range));
+ _cur_reader = std::move(tran_orc_reader);
+ } else {
+ init_status = orc_reader->init_reader(&_file_col_names, _colname_to_value_range,
+ _push_down_conjuncts, false);
+ _cur_reader = std::move(orc_reader);
+ }
break;
}
case TFileFormatType::FORMAT_CSV_PLAIN:
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 88796ecc24..50d17ff531 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -221,6 +221,10 @@ Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs,
for (size_t i = 0; i < size; ++i) {
result_filter_data[i] &= filter_data[i];
}
+ if (memchr(result_filter_data, 0x1, size) == nullptr) {
+ *can_filter_all = true;
+ return Status::OK();
+ }
}
}
return Status::OK();
diff --git a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
index 4c6108e9d4..69c58d0ac8 100644
--- a/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
+++ b/docker/thirdparties/docker-compose/hive/scripts/create_preinstalled_table.hql
@@ -621,4 +621,43 @@ insert into `schema_evo_test_orc` select 1, "kaka";
alter table `schema_evo_test_orc` ADD COLUMNS (`ts` timestamp);
insert into `schema_evo_test_orc` select 2, "messi", from_unixtime(to_unix_timestamp('20230101 13:01:03','yyyyMMdd HH:mm:ss'));
+-- Currently docker is hive 2.x version. Hive 2.x versioned full-acid tables need to run major compaction.
+SET hive.support.concurrency=true;
+SET hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+create table orc_full_acid (id INT, value STRING)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+
+insert into orc_full_acid values
+(1, 'A'),
+(2, 'B'),
+(3, 'C');
+
+update orc_full_acid set value = 'CC' where id = 3;
+
+alter table orc_full_acid compact 'major';
+
+create table orc_full_acid_par (id INT, value STRING)
+PARTITIONED BY (part_col INT)
+CLUSTERED BY (id) INTO 3 BUCKETS
+STORED AS ORC
+TBLPROPERTIES ('transactional' = 'true');
+
+insert into orc_full_acid_par PARTITION(part_col=20230101) values
+(1, 'A'),
+(2, 'B'),
+(3, 'C');
+
+insert into orc_full_acid_par PARTITION(part_col=20230102) values
+(4, 'D'),
+(5, 'E'),
+(6, 'F');
+
+update orc_full_acid_par set value = 'BB' where id = 2;
+
+alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
+alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';
+
show tables;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
index 98c941e987..64e06783c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/HMSExternalTable.java
@@ -164,27 +164,6 @@ public class HMSExternalTable extends ExternalTable {
* Support managed_table and external_table.
*/
private boolean supportedHiveTable() {
- boolean isTxnTbl = AcidUtils.isTransactionalTable(remoteTable);
- if (isTxnTbl) {
- // Only support "insert_only" transactional table
- // There are 2 types of parameter:
- // "transactional_properties" = "insert_only",
- // or,
- // "insert_only" = "true"
- // And must check "insert_only" first, because "transactional_properties" may be "default"
- Map<String, String> parameters = remoteTable.getParameters();
- if (parameters.containsKey(TBL_PROP_INSERT_ONLY)) {
- if (!parameters.get(TBL_PROP_INSERT_ONLY).equalsIgnoreCase("true")) {
- return false;
- }
- } else if (parameters.containsKey(TBL_PROP_TXN_PROPERTIES)) {
- if (!parameters.get(TBL_PROP_TXN_PROPERTIES).equalsIgnoreCase(TBL_PROP_INSERT_ONLY)) {
- return false;
- }
- } else {
- return false;
- }
- }
String inputFileFormat = remoteTable.getSd().getInputFormat();
boolean supportedFileFormat = inputFileFormat != null && SUPPORTED_HIVE_FILE_FORMATS.contains(inputFileFormat);
LOG.debug("hms table {} is {} with file format: {}", name, remoteTable.getTableType(), inputFileFormat);
@@ -215,6 +194,10 @@ public class HMSExternalTable extends ExternalTable {
return dlaType == DLAType.HIVE && AcidUtils.isTransactionalTable(remoteTable);
}
+ public boolean isFullAcidTable() {
+ return dlaType == DLAType.HIVE && AcidUtils.isFullAcidTable(remoteTable);
+ }
+
@Override
public boolean isView() {
makeSureInitialized();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index d19693e9f4..60f454999e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -114,17 +114,20 @@ public class BrokerUtil {
public static List<String> parseColumnsFromPath(String filePath, List<String> columnsFromPath)
throws UserException {
- return parseColumnsFromPath(filePath, columnsFromPath, true);
+ return parseColumnsFromPath(filePath, columnsFromPath, true, false);
}
public static List<String> parseColumnsFromPath(
String filePath,
List<String> columnsFromPath,
- boolean caseSensitive)
+ boolean caseSensitive,
+ boolean isACID)
throws UserException {
if (columnsFromPath == null || columnsFromPath.isEmpty()) {
return Collections.emptyList();
}
+ // if it is ACID, the path count is 3. The hdfs path is hdfs://xxx/table_name/par=xxx/delta(or base)_xxx/.
+ int pathCount = isACID ? 3 : 2;
if (!caseSensitive) {
for (int i = 0; i < columnsFromPath.size(); i++) {
String path = columnsFromPath.remove(i);
@@ -138,7 +141,7 @@ public class BrokerUtil {
}
String[] columns = new String[columnsFromPath.size()];
int size = 0;
- for (int i = strings.length - 2; i >= 0; i--) {
+ for (int i = strings.length - pathCount; i >= 0; i--) {
String str = strings[i];
if (str != null && str.isEmpty()) {
continue;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java
new file mode 100644
index 0000000000..c49855fbd1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/AcidInfo.java
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.hive;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Stores information about Acid properties of a partition.
+ */
+public class AcidInfo {
+ private final String partitionLocation;
+ private final List<DeleteDeltaInfo> deleteDeltas;
+
+ public AcidInfo(String partitionLocation, List<DeleteDeltaInfo> deleteDeltas) {
+ this.partitionLocation = partitionLocation;
+ this.deleteDeltas = deleteDeltas;
+ }
+
+ public String getPartitionLocation() {
+ return partitionLocation;
+ }
+
+ public List<DeleteDeltaInfo> getDeleteDeltas() {
+ return deleteDeltas;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ AcidInfo acidInfo = (AcidInfo) o;
+ return Objects.equals(partitionLocation, acidInfo.partitionLocation) && Objects.equals(
+ deleteDeltas, acidInfo.deleteDeltas);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(partitionLocation, deleteDeltas);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("AcidInfo{");
+ sb.append("partitionLocation='").append(partitionLocation).append('\'');
+ sb.append(", deleteDeltas=").append(deleteDeltas);
+ sb.append('}');
+ return sb.toString();
+ }
+
+ public static class DeleteDeltaInfo {
+ private String directoryLocation;
+ private List<String> fileNames;
+
+ public DeleteDeltaInfo(String location, List<String> filenames) {
+ this.directoryLocation = location;
+ this.fileNames = filenames;
+ }
+
+ public String getDirectoryLocation() {
+ return directoryLocation;
+ }
+
+ public List<String> getFileNames() {
+ return fileNames;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DeleteDeltaInfo that = (DeleteDeltaInfo) o;
+ return Objects.equals(directoryLocation, that.directoryLocation)
+ && Objects.equals(fileNames, that.fileNames);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(directoryLocation, fileNames);
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder("DeleteDeltaInfo{");
+ sb.append("directoryLocation='").append(directoryLocation).append('\'');
+ sb.append(", fileNames=").append(fileNames);
+ sb.append('}');
+ return sb.toString();
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
index 25ec9055ed..cbc8631883 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java
@@ -18,6 +18,8 @@
package org.apache.doris.datasource.hive;
import org.apache.doris.analysis.PartitionValue;
+import org.apache.doris.backup.Status;
+import org.apache.doris.backup.Status.ErrCode;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.ListPartitionItem;
import org.apache.doris.catalog.PartitionItem;
@@ -30,6 +32,7 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.datasource.CacheException;
import org.apache.doris.datasource.HMSExternalCatalog;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.external.hive.util.HiveUtil;
import org.apache.doris.fs.FileSystemFactory;
import org.apache.doris.fs.RemoteFiles;
@@ -78,6 +81,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -101,6 +105,8 @@ public class HiveMetaStoreCache {
// After hive 3, transactional table's will have file '_orc_acid_version' with value >= '2'.
public static final String HIVE_ORC_ACID_VERSION_FILE = "_orc_acid_version";
+ private static final String HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX = "bucket_";
+
private HMSExternalCatalog catalog;
private Executor executor;
@@ -662,7 +668,8 @@ public class HiveMetaStoreCache {
return fileCacheRef;
}
- public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds) {
+ public List<FileCacheValue> getFilesByTransaction(List<HivePartition> partitions, ValidWriteIdList validWriteIds,
+ boolean isFullAcid) {
List<FileCacheValue> fileCacheValues = Lists.newArrayList();
JobConf jobConf = getJobConf();
String remoteUser = jobConf.get(HdfsResource.HADOOP_USER_NAME);
@@ -682,12 +689,47 @@ public class HiveMetaStoreCache {
throw new Exception("Original non-ACID files in transactional tables are not supported");
}
+ if (isFullAcid) {
+ int acidVersion = 2;
+ /**
+ * From Hive version >= 3.0, delta/base files will always have file '_orc_acid_version'
+ * with value >= '2'.
+ */
+ Path baseOrDeltaPath = directory.getBaseDirectory() != null ? directory.getBaseDirectory() :
+ !directory.getCurrentDirectories().isEmpty() ? directory.getCurrentDirectories().get(0)
+ .getPath() : null;
+ String acidVersionPath = new Path(baseOrDeltaPath, "_orc_acid_version").toUri().toString();
+ RemoteFileSystem fs = FileSystemFactory.getByLocation(baseOrDeltaPath.toUri().toString(), jobConf);
+ Status status = fs.exists(acidVersionPath);
+ if (status != Status.OK) {
+ if (status.getErrCode() == ErrCode.NOT_FOUND) {
+ acidVersion = 0;
+ } else {
+ throw new Exception(String.format("Failed to check remote path {} exists.",
+ acidVersionPath));
+ }
+ }
+ if (acidVersion == 0 && !directory.getCurrentDirectories().isEmpty()) {
+ throw new Exception(
+ "Hive 2.x versioned full-acid tables need to run major compaction.");
+ }
+ }
+
// delta directories
+ List<DeleteDeltaInfo> deleteDeltas = new ArrayList<>();
for (AcidUtils.ParsedDelta delta : directory.getCurrentDirectories()) {
String location = delta.getPath().toString();
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
- locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
+ if (delta.isDeleteDelta()) {
+ List<String> deleteDeltaFileNames = locatedFiles.files().stream().map(f -> f.getName()).filter(
+ name -> name.startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
+ .collect(Collectors.toList());
+ deleteDeltas.add(new DeleteDeltaInfo(location, deleteDeltaFileNames));
+ continue;
+ }
+ locatedFiles.files().stream().filter(
+ f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
}
@@ -696,9 +738,11 @@ public class HiveMetaStoreCache {
String location = directory.getBaseDirectory().toString();
RemoteFileSystem fs = FileSystemFactory.getByLocation(location, jobConf);
RemoteFiles locatedFiles = fs.listLocatedFiles(location, true, false);
- locatedFiles.files().stream().filter(f -> !f.getName().equals(HIVE_ORC_ACID_VERSION_FILE))
+ locatedFiles.files().stream().filter(
+ f -> f.getName().startsWith(HIVE_TRANSACTIONAL_ORC_BUCKET_PREFIX))
.forEach(fileCacheValue::addFile);
}
+ fileCacheValue.setAcidInfo(new AcidInfo(partition.getPath(), deleteDeltas));
fileCacheValues.add(fileCacheValue);
}
} catch (Exception e) {
@@ -854,6 +898,8 @@ public class HiveMetaStoreCache {
// partitionValues would be ["part1", "part2"]
protected List<String> partitionValues;
+ private AcidInfo acidInfo;
+
public void addFile(RemoteFile file) {
if (files == null) {
files = Lists.newArrayList();
@@ -877,6 +923,15 @@ public class HiveMetaStoreCache {
public int getValuesSize() {
return partitionValues == null ? 0 : partitionValues.size();
}
+
+
+ public AcidInfo getAcidInfo() {
+ return acidInfo;
+ }
+
+ public void setAcidInfo(AcidInfo acidInfo) {
+ this.acidInfo = acidInfo;
+ }
}
@Data
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
index 7919d451a2..3f3c2f91d7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveTransaction.java
@@ -37,15 +37,18 @@ public class HiveTransaction {
private final String user;
private final HMSExternalTable hiveTable;
+ private final boolean isFullAcid;
+
private long txnId;
private List<String> partitionNames = Lists.newArrayList();
ValidWriteIdList validWriteIdList = null;
- public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable) {
+ public HiveTransaction(String queryId, String user, HMSExternalTable hiveTable, boolean isFullAcid) {
this.queryId = queryId;
this.user = user;
this.hiveTable = hiveTable;
+ this.isFullAcid = isFullAcid;
}
public String getQueryId() {
@@ -56,6 +59,10 @@ public class HiveTransaction {
this.partitionNames.add(partitionName);
}
+ public boolean isFullAcid() {
+ return isFullAcid;
+ }
+
public ValidWriteIdList getValidWriteIds(PooledHiveMetaStoreClient client) {
if (validWriteIdList == null) {
TableName tableName = new TableName(hiveTable.getCatalog().getName(), hiveTable.getDbName(),
diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
index e26a4c9284..9f9a1a457f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/PooledHiveMetaStoreClient.java
@@ -27,6 +27,8 @@ import com.aliyun.datalake.metastore.hive2.ProxyMetaStoreClient;
import com.amazonaws.glue.catalog.metastore.AWSCatalogMetastoreClient;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.BitSet;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -85,7 +88,12 @@ public class PooledHiveMetaStoreClient {
public List<String> getAllDatabases() {
try (CachedClient client = getClient()) {
- return client.client.getAllDatabases();
+ try {
+ return client.client.getAllDatabases();
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get all database from hms client", e);
}
@@ -93,7 +101,12 @@ public class PooledHiveMetaStoreClient {
public List<String> getAllTables(String dbName) {
try (CachedClient client = getClient()) {
- return client.client.getAllTables(dbName);
+ try {
+ return client.client.getAllTables(dbName);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get all tables for db %s", e, dbName);
}
@@ -101,7 +114,12 @@ public class PooledHiveMetaStoreClient {
public boolean tableExists(String dbName, String tblName) {
try (CachedClient client = getClient()) {
- return client.client.tableExists(dbName, tblName);
+ try {
+ return client.client.tableExists(dbName, tblName);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to check if table %s in db %s exists", e, tblName, dbName);
}
@@ -109,7 +127,12 @@ public class PooledHiveMetaStoreClient {
public List<String> listPartitionNames(String dbName, String tblName) {
try (CachedClient client = getClient()) {
- return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
+ try {
+ return client.client.listPartitionNames(dbName, tblName, MAX_LIST_PARTITION_NUM);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to list partition names for table %s in db %s", e, tblName, dbName);
}
@@ -117,7 +140,12 @@ public class PooledHiveMetaStoreClient {
public Partition getPartition(String dbName, String tblName, List<String> partitionValues) {
try (CachedClient client = getClient()) {
- return client.client.getPartition(dbName, tblName, partitionValues);
+ try {
+ return client.client.getPartition(dbName, tblName, partitionValues);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get partition for table %s in db %s with value %s", e, tblName,
dbName, partitionValues);
@@ -126,7 +154,12 @@ public class PooledHiveMetaStoreClient {
public List<Partition> getPartitionsByFilter(String dbName, String tblName, String filter) {
try (CachedClient client = getClient()) {
- return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM);
+ try {
+ return client.client.listPartitionsByFilter(dbName, tblName, filter, MAX_LIST_PARTITION_NUM);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get partition by filter for table %s in db %s", e, tblName,
dbName);
@@ -135,7 +168,12 @@ public class PooledHiveMetaStoreClient {
public Table getTable(String dbName, String tblName) {
try (CachedClient client = getClient()) {
- return client.client.getTable(dbName, tblName);
+ try {
+ return client.client.getTable(dbName, tblName);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get table %s in db %s from hms client", e, tblName, dbName);
}
@@ -143,7 +181,12 @@ public class PooledHiveMetaStoreClient {
public List<FieldSchema> getSchema(String dbName, String tblName) {
try (CachedClient client = getClient()) {
- return client.client.getSchema(dbName, tblName);
+ try {
+ return client.client.getSchema(dbName, tblName);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new HMSClientException("failed to get schema for table %s in db %s", e, tblName, dbName);
}
@@ -151,7 +194,12 @@ public class PooledHiveMetaStoreClient {
public List<ColumnStatisticsObj> getTableColumnStatistics(String dbName, String tblName, List<String> columns) {
try (CachedClient client = getClient()) {
- return client.client.getTableColumnStatistics(dbName, tblName, columns);
+ try {
+ return client.client.getTableColumnStatistics(dbName, tblName, columns);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -160,7 +208,12 @@ public class PooledHiveMetaStoreClient {
public Map<String, List<ColumnStatisticsObj>> getPartitionColumnStatistics(
String dbName, String tblName, List<String> partNames, List<String> columns) {
try (CachedClient client = getClient()) {
- return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
+ try {
+ return client.client.getPartitionColumnStatistics(dbName, tblName, partNames, columns);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -168,7 +221,12 @@ public class PooledHiveMetaStoreClient {
public CurrentNotificationEventId getCurrentNotificationEventId() {
try (CachedClient client = getClient()) {
- return client.client.getCurrentNotificationEventId();
+ try {
+ return client.client.getCurrentNotificationEventId();
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
LOG.warn("Failed to fetch current notification event id", e);
throw new MetastoreNotificationFetchException(
@@ -181,7 +239,12 @@ public class PooledHiveMetaStoreClient {
IMetaStoreClient.NotificationFilter filter)
throws MetastoreNotificationFetchException {
try (CachedClient client = getClient()) {
- return client.client.getNextNotification(lastEventId, maxEvents, filter);
+ try {
+ return client.client.getNextNotification(lastEventId, maxEvents, filter);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
LOG.warn("Failed to get next notification based on last event id {}", lastEventId, e);
throw new MetastoreNotificationFetchException(
@@ -192,7 +255,12 @@ public class PooledHiveMetaStoreClient {
public long openTxn(String user) {
try (CachedClient client = getClient()) {
- return client.client.openTxn(user);
+ try {
+ return client.client.openTxn(user);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to open transaction", e);
}
@@ -200,7 +268,12 @@ public class PooledHiveMetaStoreClient {
public void commitTxn(long txnId) {
try (CachedClient client = getClient()) {
- client.client.commitTxn(txnId);
+ try {
+ client.client.commitTxn(txnId);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to commit transaction " + txnId, e);
}
@@ -214,7 +287,13 @@ public class PooledHiveMetaStoreClient {
request.addLockComponent(component);
}
try (CachedClient client = getClient()) {
- LockResponse response = client.client.lock(request.build());
+ LockResponse response;
+ try {
+ response = client.client.lock(request.build());
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
long start = System.currentTimeMillis();
while (response.getState() == LockState.WAITING) {
long lockId = response.getLockid();
@@ -236,28 +315,40 @@ public class PooledHiveMetaStoreClient {
public ValidWriteIdList getValidWriteIds(String fullTableName, long currentTransactionId) {
try (CachedClient client = getClient()) {
- // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
- // Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
- // deletes delta directories for valid transactions that existed at the time transaction is opened
- ValidTxnList validTransactions = client.client.getValidTxns();
- List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
- Collections.singletonList(fullTableName), validTransactions.toString());
- if (tableValidWriteIdsList.size() != 1) {
- throw new Exception("tableValidWriteIdsList's size should be 1");
+ try {
+ // Pass currentTxn as 0L to get the recent snapshot of valid transactions in Hive
+ // Do not pass currentTransactionId instead as it will break Hive's listing of delta directories if major compaction
+ // deletes delta directories for valid transactions that existed at the time transaction is opened
+ ValidTxnList validTransactions = client.client.getValidTxns();
+ List<TableValidWriteIds> tableValidWriteIdsList = client.client.getValidWriteIds(
+ Collections.singletonList(fullTableName), validTransactions.toString());
+ if (tableValidWriteIdsList.size() != 1) {
+ throw new Exception("tableValidWriteIdsList's size should be 1");
+ }
+ ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
+ tableValidWriteIdsList);
+ ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
+ return writeIdList;
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
}
- ValidTxnWriteIdList validTxnWriteIdList = TxnUtils.createValidTxnWriteIdList(currentTransactionId,
- tableValidWriteIdsList);
- ValidWriteIdList writeIdList = validTxnWriteIdList.getTableValidWriteIdList(fullTableName);
- return writeIdList;
- } catch (Exception e) {
- throw new RuntimeException(
- "failed to get valid write ids for " + fullTableName + ", transaction " + currentTransactionId, e);
+ } catch (Exception e) {
+ // Ignore this exception when the version of hive is not compatible with these apis.
+ // Currently, the workaround is using a max watermark.
+ LOG.warn("failed to get valid write ids for {}, transaction {}", fullTableName, currentTransactionId, e);
+ return new ValidReaderWriteIdList(fullTableName, new long[0], new BitSet(), Long.MAX_VALUE);
}
}
private LockResponse checkLock(long lockId) {
try (CachedClient client = getClient()) {
- return client.client.checkLock(lockId);
+ try {
+ return client.client.checkLock(lockId);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to check lock " + lockId, e);
}
@@ -289,7 +380,12 @@ public class PooledHiveMetaStoreClient {
public void heartbeatForTxn(long txnId) {
try (CachedClient client = getClient()) {
- client.client.heartbeat(txnId, txnId);
+ try {
+ client.client.heartbeat(txnId, txnId);
+ } catch (Exception e) {
+ client.setThrowable(e);
+ throw e;
+ }
} catch (Exception e) {
throw new RuntimeException("failed to do heartbeat for transaction " + txnId, e);
}
@@ -297,6 +393,7 @@ public class PooledHiveMetaStoreClient {
private class CachedClient implements AutoCloseable {
private final IMetaStoreClient client;
+ private volatile Throwable throwable;
private CachedClient(HiveConf hiveConf) throws MetaException {
String type = hiveConf.get(HMSProperties.HIVE_METASTORE_TYPE);
@@ -312,10 +409,14 @@ public class PooledHiveMetaStoreClient {
}
}
+ public void setThrowable(Throwable throwable) {
+ this.throwable = throwable;
+ }
+
@Override
public void close() throws Exception {
synchronized (clientPool) {
- if (clientPool.size() > poolSize) {
+ if (throwable != null || clientPool.size() > poolSize) {
client.close();
} else {
clientPool.offer(this);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
index 3d5a08e15a..498401b05b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileQueryScanNode.java
@@ -34,6 +34,8 @@ import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.S3Util;
import org.apache.doris.common.util.Util;
+import org.apache.doris.datasource.hive.AcidInfo;
+import org.apache.doris.datasource.hive.AcidInfo.DeleteDeltaInfo;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.external.hudi.HudiScanNode;
@@ -60,6 +62,9 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+import org.apache.doris.thrift.TTransactionalHiveDeleteDeltaDesc;
+import org.apache.doris.thrift.TTransactionalHiveDesc;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@@ -68,6 +73,7 @@ import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -274,11 +280,35 @@ public abstract class FileQueryScanNode extends FileScanNode {
// If fileSplit has partition values, use the values collected from hive partitions.
// Otherwise, use the values in file path.
+ boolean isACID = false;
+ if (fileSplit instanceof HiveSplit) {
+ HiveSplit hiveSplit = (HiveSplit) split;
+ isACID = hiveSplit.isACID();
+ }
List<String> partitionValuesFromPath = fileSplit.getPartitionValues() == null
- ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false)
+ ? BrokerUtil.parseColumnsFromPath(fileSplit.getPath().toString(), pathPartitionKeys, false, isACID)
: fileSplit.getPartitionValues();
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
+ if (isACID) {
+ HiveSplit hiveSplit = (HiveSplit) split;
+ hiveSplit.setTableFormatType(TableFormatType.TRANSACTIONAL_HIVE);
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+ tableFormatFileDesc.setTableFormatType(hiveSplit.getTableFormatType().value());
+ AcidInfo acidInfo = (AcidInfo) hiveSplit.getInfo();
+ TTransactionalHiveDesc transactionalHiveDesc = new TTransactionalHiveDesc();
+ transactionalHiveDesc.setPartition(acidInfo.getPartitionLocation());
+ List<TTransactionalHiveDeleteDeltaDesc> deleteDeltaDescs = new ArrayList<>();
+ for (DeleteDeltaInfo deleteDeltaInfo : acidInfo.getDeleteDeltas()) {
+ TTransactionalHiveDeleteDeltaDesc deleteDeltaDesc = new TTransactionalHiveDeleteDeltaDesc();
+ deleteDeltaDesc.setDirectoryLocation(deleteDeltaInfo.getDirectoryLocation());
+ deleteDeltaDesc.setFileNames(deleteDeltaInfo.getFileNames());
+ deleteDeltaDescs.add(deleteDeltaDesc);
+ }
+ transactionalHiveDesc.setDeleteDeltas(deleteDeltaDescs);
+ tableFormatFileDesc.setTransactionalHiveParams(transactionalHiveDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
// external data lake table
if (fileSplit instanceof IcebergSplit) {
// TODO: extract all data lake split to factory
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
index 694e97a7d9..9a050cc139 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.FileSplit.FileSplitCreator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -203,6 +204,13 @@ public abstract class FileScanNode extends ExternalScanNode {
protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
long modificationTime, boolean splittable, List<String> partitionValues) throws IOException {
+ return splitFile(path, blockSize, blockLocations, length, modificationTime, splittable, partitionValues,
+ FileSplitCreator.DEFAULT);
+ }
+
+ protected List<Split> splitFile(Path path, long blockSize, BlockLocation[] blockLocations, long length,
+ long modificationTime, boolean splittable, List<String> partitionValues, SplitCreator splitCreator)
+ throws IOException {
if (blockLocations == null) {
blockLocations = new BlockLocation[0];
}
@@ -216,7 +224,7 @@ public abstract class FileScanNode extends ExternalScanNode {
if (!splittable) {
LOG.debug("Path {} is not splittable.", path);
String[] hosts = blockLocations.length == 0 ? null : blockLocations[0].getHosts();
- result.add(new FileSplit(path, 0, length, length, modificationTime, hosts, partitionValues));
+ result.add(splitCreator.create(path, 0, length, length, modificationTime, hosts, partitionValues));
return result;
}
long bytesRemaining;
@@ -224,13 +232,13 @@ public abstract class FileScanNode extends ExternalScanNode {
bytesRemaining -= splitSize) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
- result.add(new FileSplit(path, length - bytesRemaining, splitSize,
+ result.add(splitCreator.create(path, length - bytesRemaining, splitSize,
length, modificationTime, hosts, partitionValues));
}
if (bytesRemaining != 0L) {
int location = getBlockIndex(blockLocations, length - bytesRemaining);
String[] hosts = location == -1 ? null : blockLocations[location].getHosts();
- result.add(new FileSplit(path, length - bytesRemaining, bytesRemaining,
+ result.add(splitCreator.create(path, length - bytesRemaining, bytesRemaining,
length, modificationTime, hosts, partitionValues));
}
@@ -238,7 +246,7 @@ public abstract class FileScanNode extends ExternalScanNode {
return result;
}
- private int getBlockIndex(BlockLocation[] blkLocations, long offset) {
+ protected int getBlockIndex(BlockLocation[] blkLocations, long offset) {
if (blkLocations == null || blkLocations.length == 0) {
return -1;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
index 21b88f9c2b..4e9d0bae56 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/FileSplit.java
@@ -66,4 +66,15 @@ public class FileSplit implements Split {
public Object getInfo() {
return null;
}
+
+ public static class FileSplitCreator implements SplitCreator {
+
+ public static final FileSplitCreator DEFAULT = new FileSplitCreator();
+
+ @Override
+ public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
+ List<String> partitionValues) {
+ return new FileSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues);
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
index 79daa2df32..b8ac376307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanNode.java
@@ -39,6 +39,7 @@ import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.datasource.hive.HiveTransaction;
import org.apache.doris.planner.ListPartitionPrunerV2;
import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.external.HiveSplit.HiveSplitCreator;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.spi.Split;
import org.apache.doris.statistics.StatisticalType;
@@ -104,7 +105,7 @@ public class HiveScanNode extends FileQueryScanNode {
if (hmsTable.isHiveTransactionalTable()) {
this.hiveTransaction = new HiveTransaction(DebugUtil.printId(ConnectContext.get().queryId()),
- ConnectContext.get().getQualifiedUser(), hmsTable);
+ ConnectContext.get().getQualifiedUser(), hmsTable, hmsTable.isFullAcidTable());
Env.getCurrentHiveTransactionMgr().register(hiveTransaction);
}
}
@@ -192,7 +193,8 @@ public class HiveScanNode extends FileQueryScanNode {
for (HiveMetaStoreCache.HiveFileStatus status : fileCacheValue.getFiles()) {
allFiles.addAll(splitFile(status.getPath(), status.getBlockSize(),
status.getBlockLocations(), status.getLength(), status.getModificationTime(),
- isSplittable, fileCacheValue.getPartitionValues()));
+ isSplittable, fileCacheValue.getPartitionValues(),
+ new HiveSplitCreator(fileCacheValue.getAcidInfo())));
}
}
}
@@ -208,7 +210,7 @@ public class HiveScanNode extends FileQueryScanNode {
}
ValidWriteIdList validWriteIds = hiveTransaction.getValidWriteIds(
((HMSExternalCatalog) hmsTable.getCatalog()).getClient());
- return cache.getFilesByTransaction(partitions, validWriteIds);
+ return cache.getFilesByTransaction(partitions, validWriteIds, hiveTransaction.isFullAcid());
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
new file mode 100644
index 0000000000..0f230c85f4
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner.external;
+
+import org.apache.doris.datasource.hive.AcidInfo;
+import org.apache.doris.spi.Split;
+
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+
+public class HiveSplit extends FileSplit {
+
+ public HiveSplit(Path path, long start, long length, long fileLength,
+ long modificationTime, String[] hosts, List<String> partitionValues, AcidInfo acidInfo) {
+ super(path, start, length, fileLength, modificationTime, hosts, partitionValues);
+ this.acidInfo = acidInfo;
+ }
+
+ public HiveSplit(Path path, long start, long length, long fileLength, String[] hosts, AcidInfo acidInfo) {
+ super(path, start, length, fileLength, hosts, null);
+ this.acidInfo = acidInfo;
+ }
+
+ @Override
+ public Object getInfo() {
+ return acidInfo;
+ }
+
+ private AcidInfo acidInfo;
+
+ public boolean isACID() {
+ return acidInfo != null;
+ }
+
+ public static class HiveSplitCreator implements SplitCreator {
+
+ private AcidInfo acidInfo;
+
+ public HiveSplitCreator(AcidInfo acidInfo) {
+ this.acidInfo = acidInfo;
+ }
+
+ public HiveSplitCreator() {
+ this(null);
+ }
+
+ @Override
+ public Split create(Path path, long start, long length, long fileLength, long modificationTime, String[] hosts,
+ List<String> partitionValues) {
+ return new HiveSplit(path, start, length, fileLength, modificationTime, hosts, partitionValues, acidInfo);
+ }
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
similarity index 72%
copy from fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
copy to fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
index f97c8ea1ec..ff9d4dd92e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/SplitCreator.java
@@ -17,19 +17,13 @@
package org.apache.doris.planner.external;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon");
+import org.apache.doris.spi.Split;
- private final String tableFormatType;
+import org.apache.hadoop.fs.Path;
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
+import java.util.List;
- public String value() {
- return tableFormatType;
- }
+public interface SplitCreator {
+ Split create(Path path, long start, long length, long fileLength,
+ long modificationTime, String[] hosts, List<String> partitionValues);
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
index f97c8ea1ec..891e138db6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/TableFormatType.java
@@ -21,7 +21,8 @@ public enum TableFormatType {
HIVE("hive"),
ICEBERG("iceberg"),
HUDI("hudi"),
- PAIMON("paimon");
+ PAIMON("paimon"),
+ TRANSACTIONAL_HIVE("transactional_hive");
private final String tableFormatType;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f3015f18cb..4844d2ed7f 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -314,11 +314,22 @@ struct THudiFileDesc {
10: optional list<string> nested_fields;
}
+struct TTransactionalHiveDeleteDeltaDesc {
+ 1: optional string directory_location
+ 2: optional list<string> file_names
+}
+
+struct TTransactionalHiveDesc {
+ 1: optional string partition
+ 2: optional list<TTransactionalHiveDeleteDeltaDesc> delete_deltas
+}
+
struct TTableFormatFileDesc {
1: optional string table_format_type
2: optional TIcebergFileDesc iceberg_params
3: optional THudiFileDesc hudi_params
4: optional TPaimonFileDesc paimon_params
+ 5: optional TTransactionalHiveDesc transactional_hive_params
}
struct TFileScanRangeParams {
diff --git a/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out b/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out
new file mode 100644
index 0000000000..e4c6a6c6d2
--- /dev/null
+++ b/regression-test/data/external_catalog_p0/hive/test_transactional_hive.out
@@ -0,0 +1,33 @@
+-- This file is automatically generated. You should know what you did if you want to edit this
+-- !q01 --
+1 A
+2 B
+3 CC
+
+-- !q02 --
+A
+B
+CC
+
+-- !q03 --
+3 CC
+
+-- !q01 --
+1 A 20230101
+2 BB 20230101
+3 C 20230101
+4 D 20230102
+5 E 20230102
+6 F 20230102
+
+-- !q02 --
+A
+BB
+C
+D
+E
+F
+
+-- !q03 --
+2 BB 20230101
+
diff --git a/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy b/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy
new file mode 100644
index 0000000000..d15f49749f
--- /dev/null
+++ b/regression-test/suites/external_catalog_p0/hive/test_transactional_hive.groovy
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_transactional_hive", "p0") {
+ def q01 = {
+ qt_q01 """
+ select * from orc_full_acid order by id;
+ """
+ qt_q02 """
+ select value from orc_full_acid order by id;
+ """
+ qt_q03 """
+ select * from orc_full_acid where value = 'CC' order by id;
+ """
+ }
+
+ def q01_par = {
+ qt_q01 """
+ select * from orc_full_acid_par order by id;
+ """
+ qt_q02 """
+ select value from orc_full_acid_par order by id;
+ """
+ qt_q03 """
+ select * from orc_full_acid_par where value = 'BB' order by id;
+ """
+ }
+ String enabled = context.config.otherConfigs.get("enableHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String hms_port = context.config.otherConfigs.get("hms_port")
+ String catalog_name = "test_transactional_hive"
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """create catalog if not exists ${catalog_name} properties (
+ "type"="hms",
+ 'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}'
+ );"""
+ sql """use `${catalog_name}`.`default`"""
+
+ q01()
+ q01_par()
+
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org