You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/19 05:10:44 UTC
[doris] 10/21: [fix](iceberg-v2) icebergv2 filter data path (#14470)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit d23e7905b09c1d1f4dfe4849d21e5a79034dbf4e
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Thu Dec 15 10:18:12 2022 +0800
[fix](iceberg-v2) icebergv2 filter data path (#14470)
1. a icebergv2 delete file may cross many data paths, so the path of a file split is required as a predicate to filter rows of delete file
- create delete file structure to save predicate parameters
- create predicate for file path
2. add some log to print row range
3. fix bug when create file metadata
---
be/src/vec/exec/format/parquet/parquet_common.h | 8 ++
be/src/vec/exec/format/parquet/schema_desc.h | 4 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 10 +-
.../vec/exec/format/parquet/vparquet_page_index.h | 4 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 58 +++++++-----
be/src/vec/exec/format/parquet/vparquet_reader.h | 3 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 104 ++++++++++++++++-----
be/src/vec/exec/format/table/iceberg_reader.h | 26 ++++--
be/src/vec/exec/format/table/table_format_reader.h | 3 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 6 +-
.../planner/external/ExternalFileScanNode.java | 2 +-
.../apache/doris/planner/external/HiveSplit.java | 1 +
.../planner/external/IcebergScanProvider.java | 86 +++++++++++++++--
.../doris/planner/external/IcebergSplit.java | 5 +
.../doris/planner/external/QueryScanProvider.java | 9 +-
gensrc/thrift/PlanNodes.thrift | 7 +-
16 files changed, 252 insertions(+), 84 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index d124c2fb9a..611ce969f1 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -46,6 +46,14 @@ struct RowRange {
int64_t first_row;
int64_t last_row;
+
+ bool operator<(const RowRange& range) const { return first_row < range.first_row; }
+
+ std::string debug_string() const {
+ std::stringstream ss;
+ ss << "[" << first_row << "," << last_row << ")";
+ return ss.str();
+ }
};
struct ParquetReadColumn {
diff --git a/be/src/vec/exec/format/parquet/schema_desc.h b/be/src/vec/exec/format/parquet/schema_desc.h
index 73f9f97d97..c45e96f236 100644
--- a/be/src/vec/exec/format/parquet/schema_desc.h
+++ b/be/src/vec/exec/format/parquet/schema_desc.h
@@ -40,11 +40,13 @@ struct FieldSchema {
tparquet::Type::type physical_type;
// The index order in FieldDescriptor._physical_fields
int physical_column_index = -1;
-
int16_t definition_level = 0;
int16_t repetition_level = 0;
std::vector<FieldSchema> children;
+ FieldSchema() = default;
+ ~FieldSchema() = default;
+ FieldSchema(const FieldSchema& fieldSchema) = default;
std::string debug_string() const;
};
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index 235a5f3e3b..c0557c482a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -67,17 +67,17 @@ void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end
}
int index = _row_range_index;
while (index < _row_ranges->size()) {
- const RowRange& read_range = (*_row_ranges)[index];
- if (read_range.last_row <= start_index) {
+ const RowRange& row_range = (*_row_ranges)[index];
+ if (row_range.last_row <= start_index) {
index++;
_row_range_index++;
continue;
}
- if (read_range.first_row >= end_index) {
+ if (row_range.first_row >= end_index) {
break;
}
- int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row;
- int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index;
+ int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row;
+ int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index;
read_ranges.emplace_back(start, end);
index++;
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index 978a798bf4..4a27593abf 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -36,9 +36,9 @@ public:
std::vector<int>& skipped_ranges);
bool check_and_get_page_index_ranges(const std::vector<tparquet::ColumnChunk>& columns);
Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
- tparquet::ColumnIndex* _column_index);
+ tparquet::ColumnIndex* column_index);
Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
- tparquet::OffsetIndex* _offset_index);
+ tparquet::OffsetIndex* offset_index);
private:
friend class ParquetReader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 01c226b1e4..c8d4f6b3c4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -324,11 +324,13 @@ Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
_scan_range.start_offset,
_scan_range.file_size, 0, _file_reader));
}
- RETURN_IF_ERROR(_file_reader->open());
- if (_file_reader->size() == 0) {
- return Status::EndOfFile("Empty Parquet File");
+ if (_file_metadata == nullptr) {
+ RETURN_IF_ERROR(_file_reader->open());
+ if (_file_reader->size() == 0) {
+ return Status::EndOfFile("Empty Parquet File");
+ }
+ RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
}
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
@@ -360,57 +362,64 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-void ParquetReader::merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges) {
+void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) {
if (_row_ranges.empty()) {
+ _current_group_reader->set_row_ranges(_row_ranges);
return;
}
- std::vector<RowRange> candidate_ranges;
- auto start_range = _row_ranges.begin();
if (!delete_row_ranges.empty()) {
+ std::vector<RowRange> candidate_ranges;
+ auto start_range = _row_ranges.begin();
auto delete_range = delete_row_ranges.begin();
- int64_t range_start_idx = start_range->first_row;
+ int64_t processed_range_start_idx = start_range->first_row;
while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) {
int64_t delete_start = delete_range->first_row;
int64_t delete_end = delete_range->last_row;
int64_t range_start = start_range->first_row;
int64_t range_end = start_range->last_row;
-
- if (delete_end >= range_end) {
- if (range_start < delete_start) {
+ if (delete_end > range_end) {
+ if (range_start < processed_range_start_idx) {
+ // rows before processed_range_start_idx have been processed
+ range_start = processed_range_start_idx;
+ }
+ if (range_end < delete_start) {
+ /**
+ * start_range
+ * || --------- || - |--------- |
+ * delete_range
+ */
+ candidate_ranges.emplace_back(range_start, range_end);
+ } else if (range_start < delete_start) {
/**
* row_range
* || --------|-------- || ----- |
* delete_start delete_end
*/
candidate_ranges.emplace_back(range_start, delete_start);
- } else if (range_end <= delete_start) {
- /**
- * start_range
- * || --------- || ----------- |
- * delete_range
- */
- candidate_ranges.emplace_back(range_start, range_end);
}
+ // range_end > delete_end && range_start > delete_start
start_range++;
} else {
// delete_end < range_end,most of the time, we will use this branch
- if (range_start <= delete_start) {
+ if (processed_range_start_idx < delete_start) {
/**
* row_range_start row_range_end
* || --- | --------- | --- ||
* delete_range
*/
- candidate_ranges.emplace_back(range_start_idx, delete_start);
- range_start_idx = delete_end + 1;
+ candidate_ranges.emplace_back(processed_range_start_idx, delete_start);
}
+ // delete_end is in row_range, so it can assign to processed_range_start_idx
+ processed_range_start_idx = delete_end;
delete_range++;
if (delete_range == delete_row_ranges.end()) {
- candidate_ranges.emplace_back(delete_end + 1, range_end);
+ range_end = _row_ranges[_row_ranges.size() - 1].last_row;
+ if (processed_range_start_idx != range_end) {
+ candidate_ranges.emplace_back(processed_range_start_idx, range_end);
+ }
}
}
}
- }
- if (!candidate_ranges.empty()) {
_row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end());
}
_current_group_reader->set_row_ranges(_row_ranges);
@@ -545,7 +554,6 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
if (!_has_page_index(row_group.columns, page_index)) {
return Status::OK();
}
- // int64_t buffer_size = page_index._column_index_size;
uint8_t col_index_buff[page_index._column_index_size];
int64_t bytes_read = 0;
RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index aaae115bd3..f5891754e7 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -79,7 +79,7 @@ public:
Status file_metadata(FileMetaData** metadata);
- void merge_delete_row_ranges(const std::vector<RowRange>& delete_row_ranges);
+ void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges);
int64_t size() const { return _file_reader->size(); }
@@ -149,7 +149,6 @@ private:
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
std::unique_ptr<FileReader> _file_reader = nullptr;
- std::vector<RowRange> _delete_row_ranges;
std::vector<RowRange> _row_ranges;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 041c70438f..9e2f659772 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -29,38 +29,64 @@ namespace doris::vectorized {
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
const std::string ICEBERG_ROW_POS = "pos";
+IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
+ RuntimeState* state, const TFileScanRangeParams& params)
+ : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
+ static const char* iceberg_profile = "IcebergProfile";
+ ADD_TIMER(_profile, iceberg_profile);
+ _iceberg_profile._delete_files_init_time =
+ ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile);
+ _iceberg_profile._delete_files_read_total_time =
+ ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
+}
+
+IcebergTableReader::~IcebergTableReader() {
+ if (_data_path_conjunct_ctx != nullptr) {
+ _data_path_conjunct_ctx->close(_state);
+ }
+}
+
Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
return _file_format_reader->get_next_block(block, read_rows, eof);
}
+Status IcebergTableReader::set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContext*>& missing_columns) {
+ return _file_format_reader->set_fill_columns(partition_columns, missing_columns);
+}
+
Status IcebergTableReader::get_columns(
std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) {
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
-void IcebergTableReader::filter_rows() {
+void IcebergTableReader::filter_rows(const TFileRangeDesc& range) {
if (_cur_delete_file_reader == nullptr) {
return;
}
- auto& table_desc = _params.table_format_params.iceberg_params;
+ SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time);
+ auto& table_desc = range.table_format_params.iceberg_params;
auto& version = table_desc.format_version;
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
return;
}
bool eof = false;
- std::vector<RowRange> delete_row_ranges;
+ std::set<RowRange> delete_row_ranges;
while (!eof) {
size_t read_rows = 0;
Block block = Block();
- for (const FieldSchema* field : _column_schemas) {
- DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field->type, true);
+ for (const FieldSchema& field : _column_schemas) {
+ DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true);
MutableColumnPtr data_column = data_type->create_column();
- block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field->name));
+ block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name));
}
Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof);
if (!st.ok() || eof) {
if (!_delete_file_readers.empty()) {
+ eof = false;
_cur_delete_file_reader = std::move(_delete_file_readers.front());
_delete_file_readers.pop_front();
}
@@ -88,40 +114,58 @@ void IcebergTableReader::filter_rows() {
int64_t row_id = delete_row_ids[i];
int64_t row_range_start = row_id;
int64_t row_range_end = row_id;
- // todo: add debug info
- // todo: asure reading delete file data in file_range only
while (i + 1 < num_deleted_ids) {
if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) {
row_range_end = delete_row_ids[i + 1];
i++;
continue;
} else {
- delete_row_ranges.emplace_back(row_range_start, row_range_end);
+ delete_row_ranges.emplace(row_range_start, row_range_end + 1);
row_range_start = ++row_range_end;
break;
}
}
if (i == num_deleted_ids - 1) {
- delete_row_ranges.emplace_back(row_range_start,
- delete_row_ids[num_deleted_ids - 1]);
+ delete_row_ranges.emplace(row_range_start,
+ delete_row_ids[num_deleted_ids - 1] + 1);
}
row_range_start = delete_row_ids[i + 1];
i++;
}
}
}
+ if (VLOG_IS_ON(3)) {
+ if (!delete_row_ranges.empty()) {
+ std::stringstream out;
+ out << "[";
+ for (const RowRange& delete_row_range : delete_row_ranges) {
+ out << " " << delete_row_range.debug_string();
+ }
+ out << " ]";
+ VLOG_NOTICE << "Delete row range info: " << out.str();
+ }
+ }
ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
parquet_reader->merge_delete_row_ranges(delete_row_ranges);
}
-Status IcebergTableReader::init_row_filters() {
- auto& table_desc = _params.table_format_params.iceberg_params;
+Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
+ auto& table_desc = range.table_format_params.iceberg_params;
auto& version = table_desc.format_version;
if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) {
+ SCOPED_TIMER(_iceberg_profile._delete_files_init_time);
auto& delete_file_type = table_desc.content;
auto files = table_desc.delete_files;
if (delete_file_type == POSITON_DELELE) {
// position delete
+ auto row_desc = RowDescriptor(_state->desc_tbl(),
+ std::vector<TupleId>({table_desc.delete_table_tuple_id}),
+ std::vector<bool>({false}));
+ RETURN_IF_ERROR(VExpr::create_expr_tree(
+ _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx));
+ RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
+ RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
+ vector<std::string> names;
for (auto& delete_file : files) {
_position_delete_params.low_bound_index = delete_file.position_lower_bound;
_position_delete_params.upper_bound_index = delete_file.position_upper_bound;
@@ -134,17 +178,29 @@ Status IcebergTableReader::init_row_filters() {
ParquetReader* delete_reader = new ParquetReader(
_profile, _params, delete_range, _state->query_options().batch_size,
const_cast<cctz::time_zone*>(&_state->timezone_obj()));
- FileMetaData* metadata = nullptr;
- RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
-
- auto& delete_file_schema = metadata->schema();
- vector<std::string> names;
- for (auto i = 0; i < delete_file_schema.size(); ++i) {
- const FieldSchema* field = delete_file_schema.get_column(i);
- _column_schemas.emplace_back(field);
- names.emplace_back(field->name);
+ if (_delete_file_schema == nullptr) {
+ FileMetaData* metadata = nullptr;
+ RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
+ if (metadata == nullptr) {
+ break;
+ }
+ _delete_file_schema = &metadata->schema();
+ int num_of_col = _delete_file_schema->size();
+ for (auto i = 0; i < num_of_col; ++i) {
+ const FieldSchema* field = _delete_file_schema->get_column(i);
+ _column_schemas.emplace_back(*field);
+ names.emplace_back(field->name);
+ }
}
- Status d_st = delete_reader->init_reader(names, false);
+ DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size());
+ // The expr assure reading delete file data from current file range only
+ Status d_st =
+ delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false);
+ std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContext*> missing_columns;
+ delete_reader->set_fill_columns(partition_columns, missing_columns);
+
_delete_file_readers.emplace_back((GenericReader*)delete_reader);
ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
@@ -161,7 +217,7 @@ Status IcebergTableReader::init_row_filters() {
}
}
// todo: equality delete
- filter_rows();
+ filter_rows(range);
return Status::OK();
}
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index e862fe6171..c71ffdaa9c 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -23,22 +23,25 @@
#include "table_format_reader.h"
#include "vec/exec/format/generic_reader.h"
+#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
class IcebergTableReader : public TableFormatReader {
public:
IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
- RuntimeState* state, const TFileScanRangeParams& params)
- : TableFormatReader(file_format_reader),
- _profile(profile),
- _state(state),
- _params(params) {}
- Status init_row_filters();
- void filter_rows() override;
+ RuntimeState* state, const TFileScanRangeParams& params);
+ ~IcebergTableReader() override;
+ Status init_row_filters(const TFileRangeDesc& range);
+ void filter_rows(const TFileRangeDesc& range) override;
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
+ Status set_fill_columns(
+ const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
+ partition_columns,
+ const std::unordered_map<std::string, VExprContext*>& missing_columns) override;
+
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
@@ -52,13 +55,20 @@ public:
};
private:
+ struct IcebergProfile {
+ RuntimeProfile::Counter* _delete_files_init_time;
+ RuntimeProfile::Counter* _delete_files_read_total_time;
+ };
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
- std::vector<const FieldSchema*> _column_schemas;
+ std::vector<FieldSchema> _column_schemas;
std::deque<std::unique_ptr<GenericReader>> _delete_file_readers;
std::unique_ptr<GenericReader> _cur_delete_file_reader;
PositionDeleteParams _position_delete_params;
+ const FieldDescriptor* _delete_file_schema = nullptr;
+ VExprContext* _data_path_conjunct_ctx = nullptr;
+ IcebergProfile _iceberg_profile;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h
index 3409135dee..188d41a2b9 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -29,6 +29,7 @@ namespace doris::vectorized {
class TableFormatReader : public GenericReader {
public:
TableFormatReader(GenericReader* file_format_reader);
+ ~TableFormatReader() override = default;
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override {
return _file_format_reader->get_next_block(block, read_rows, eof);
}
@@ -37,7 +38,7 @@ public:
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
- virtual void filter_rows() = 0;
+ virtual void filter_rows(const TFileRangeDesc& range) = 0;
protected:
std::string _table_format; // hudi, iceberg
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 6d170a6130..cf61384e81 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -491,11 +491,11 @@ Status VFileScanner::_get_next_reader() {
}
init_status = parquet_reader->init_reader(_file_col_names, _colname_to_value_range,
_push_down_expr);
- if (_params.__isset.table_format_params &&
- _params.table_format_params.table_format_type == "iceberg") {
+ if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type == "iceberg") {
IcebergTableReader* iceberg_reader = new IcebergTableReader(
(GenericReader*)parquet_reader, _profile, _state, _params);
- iceberg_reader->init_row_filters();
+ iceberg_reader->init_row_filters(range);
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
_cur_reader.reset((GenericReader*)parquet_reader);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
index c42c97041b..4117f20539 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java
@@ -207,7 +207,7 @@ public class ExternalFileScanNode extends ExternalScanNode {
scanProvider = new HudiScanProvider(hmsTable, desc, columnNameToRange);
break;
case ICEBERG:
- scanProvider = new IcebergScanProvider(hmsTable, desc, columnNameToRange);
+ scanProvider = new IcebergScanProvider(hmsTable, analyzer, desc, columnNameToRange);
break;
case HIVE:
scanProvider = new HiveScanProvider(hmsTable, desc, columnNameToRange);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
index a1a1749c2f..6c8f916a5e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveSplit.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.mapred.FileSplit;
@Data
public class HiveSplit extends FileSplit {
+ public HiveSplit() {}
public HiveSplit(Path file, long start, long length, String[] hosts) {
super(file, start, length, hosts);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index 30b8888fd7..7681d6a66d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -17,8 +17,19 @@
package org.apache.doris.planner.external;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StringLiteral;
+import org.apache.doris.analysis.TableName;
+import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@@ -26,10 +37,15 @@ import org.apache.doris.common.UserException;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.planner.ColumnRange;
import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
+import org.apache.doris.thrift.TIcebergTable;
+import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableFormatFileDesc;
+import org.apache.doris.thrift.TTableType;
+import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
@@ -59,13 +75,20 @@ import java.util.OptionalLong;
public class IcebergScanProvider extends HiveScanProvider {
private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
+ public static final String V2_DELETE_TBL = "iceberg#delete#tbl";
+ public static final String V2_DELETE_DB = "iceberg#delete#db";
+ private static final DeleteFileTempTable scanDeleteTable =
+ new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE);
+ private final Analyzer analyzer;
- public IcebergScanProvider(HMSExternalTable hmsTable, TupleDescriptor desc,
- Map<String, ColumnRange> columnNameToRange) {
+ public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc,
+ Map<String, ColumnRange> columnNameToRange) {
super(hmsTable, desc, columnNameToRange);
+ this.analyzer = analyzer;
}
- public static void setIcebergParams(ExternalFileScanNode.ParamCreateContext context, IcebergSplit icebergSplit) {
+ public static void setIcebergParams(TFileRangeDesc rangeDesc, IcebergSplit icebergSplit)
+ throws UserException {
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(icebergSplit.getTableFormatType().value());
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
@@ -74,6 +97,7 @@ public class IcebergScanProvider extends HiveScanProvider {
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
+ setPathSelectConjunct(fileDesc, icebergSplit);
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
deleteFileDesc.setPath(filter.getDeleteFilePath());
@@ -99,7 +123,20 @@ public class IcebergScanProvider extends HiveScanProvider {
}
}
tableFormatFileDesc.setIcebergParams(fileDesc);
- context.params.setTableFormatParams(tableFormatFileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
+ private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit)
+ throws UserException {
+ BaseTableRef tableRef = icebergSplit.getDeleteTableRef();
+ fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt());
+ SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH);
+ lhs.analyze(icebergSplit.getAnalyzer());
+ lhs.getDesc().setIsMaterialized(true);
+ StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString());
+ BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
+ pathSelectConjunct.analyze(icebergSplit.getAnalyzer());
+ fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift());
}
@Override
@@ -134,18 +171,27 @@ public class IcebergScanProvider extends HiveScanProvider {
scan = scan.filter(predicate);
}
List<InputSplit> splits = new ArrayList<>();
-
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
+ BaseTableRef tableRef = null;
+ if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
+ TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName());
+ fullName.analyze(analyzer);
+ TableRef ref = new TableRef(fullName, fullName.toString(), null);
+ tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName());
+ tableRef.analyze(analyzer);
+ }
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
String dataFilePath = spitTask.file().path().toString();
IcebergSplit split = new IcebergSplit(new Path(dataFilePath), spitTask.start(),
spitTask.length(), new String[0]);
split.setFormatVersion(formatVersion);
- if (formatVersion == 2) {
+ if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(spitTask));
+ split.setDeleteTableRef(tableRef);
}
split.setTableFormatType(TableFormatType.ICEBERG);
+ split.setAnalyzer(analyzer);
splits.add(split);
}
}
@@ -193,4 +239,32 @@ public class IcebergScanProvider extends HiveScanProvider {
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return Collections.emptyList();
}
+
+ @Data
+ static class DeleteFileTempTable extends ExternalTable {
+ public static final String DATA_FILE_PATH = "file_path";
+ private final TableName tableName;
+ private final List<Column> fullSchema = new ArrayList<>();
+
+ public DeleteFileTempTable(TableType type) {
+ super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type);
+ this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL);
+ Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true);
+ this.fullSchema.add(dataFilePathCol);
+ }
+
+ @Override
+ public List<Column> getFullSchema() {
+ return fullSchema;
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
+ fullSchema.size(), 0, getName(), "");
+ tTableDescriptor.setIcebergTable(tIcebergTable);
+ return tTableDescriptor;
+ }
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
index 05ae3c69d3..14fce6caf8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
@@ -17,6 +17,9 @@
package org.apache.doris.planner.external;
+import org.apache.doris.analysis.Analyzer;
+import org.apache.doris.analysis.BaseTableRef;
+
import lombok.Data;
import org.apache.hadoop.fs.Path;
@@ -28,8 +31,10 @@ public class IcebergSplit extends HiveSplit {
super(file, start, length, hosts);
}
+ private Analyzer analyzer;
private String dataFilePath;
private Integer formatVersion;
+ private BaseTableRef deleteTableRef;
private List<IcebergDeleteFileFilter> deleteFileFilters;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
index 3066b7354a..a2ce044bb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/QueryScanProvider.java
@@ -73,9 +73,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
context.params.setFileAttributes(getFileAttributes());
}
- if (inputSplit instanceof IcebergSplit) {
- IcebergScanProvider.setIcebergParams(context, (IcebergSplit) inputSplit);
- }
// set hdfs params for hdfs file type.
Map<String, String> locationProperties = getLocationProperties();
if (locationType == TFileType.FILE_HDFS) {
@@ -96,7 +93,6 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
} else if (locationType == TFileType.FILE_S3) {
context.params.setProperties(locationProperties);
}
-
TScanRangeLocations curLocations = newLocations(context.params, backendPolicy);
FileSplitStrategy fileSplitStrategy = new FileSplitStrategy();
@@ -108,7 +104,10 @@ public abstract class QueryScanProvider implements FileScanProviderIf {
pathPartitionKeys, false);
TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath, pathPartitionKeys);
-
+ // external data lake table
+ if (split instanceof IcebergSplit) {
+ IcebergScanProvider.setIcebergParams(rangeDesc, (IcebergSplit) inputSplit);
+ }
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
LOG.debug("assign to backend {} with table split: {} ({}, {}), location: {}",
curLocations.getLocations().get(0).getBackendId(), fileSplit.getPath(), fileSplit.getStart(),
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2725c67acc..0dd3373b74 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -268,7 +268,10 @@ struct TIcebergFileDesc {
1: optional i32 format_version;
// Iceberg file type, 0: data, 1: position delete, 2: equality delete.
2: optional i32 content;
+ // When open a delete file, filter the data file path with the 'file_path' property
3: optional list<TIcebergDeleteFileDesc> delete_files;
+ 4: optional Types.TTupleId delete_table_tuple_id;
+ 5: optional Exprs.TExpr file_select_conjunct;
}
struct TTableFormatFileDesc {
@@ -308,7 +311,7 @@ struct TFileScanRangeParams {
14: optional list<Types.TNetworkAddress> broker_addresses
15: optional TFileAttributes file_attributes
16: optional Exprs.TExpr pre_filter_exprs
- // For data lake table format
+ // Deprecated, For data lake table format
17: optional TTableFormatFileDesc table_format_params
// For csv query task, same the column index in file, order by dest_tuple
18: optional list<i32> column_idxs
@@ -329,6 +332,8 @@ struct TFileRangeDesc {
6: optional list<string> columns_from_path;
// column names from file path, in the same order with columns_from_path
7: optional list<string> columns_from_path_keys;
+ // For data lake table format
+ 8: optional TTableFormatFileDesc table_format_params
}
// TFileScanRange represents a set of descriptions of a file and the rules for reading and converting it.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org