You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/30 05:09:48 UTC
[doris] branch branch-1.2-lts updated: [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 36b61d4826 [optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)
36b61d4826 is described below
commit 36b61d4826eeb1f172d7043a32d471aaeb64e1e1
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Fri Dec 30 08:57:55 2022 +0800
[optimize](multi-catalog) use dictionary encode&filter to process delete files (#15441)
**Optimize**
PR #14470 has used `Expr` to filter delete rows to match current data file,
but the rows in the delete file are [sorted by file_path then position](https://iceberg.apache.org/spec/#position-delete-files)
to optimize filtering rows while scanning, so this PR remove `Expr` and use binary search to filter delete rows.
In addition, delete files are likely to be encoded in dictionary, it's time-consuming to decode `file_path`
columns into `ColumnString`, so this PR use `ColumnDictionary` to read `file_path` column.
After testing, the performance of iceberg v2's MOR is improved by 30%+.
**Fix Bug**
Lazy-read-block may not have the filter column, if the whole group is filtered by `Expr`
and the batch_eof is generated from next batch.
---
be/src/vec/columns/column_dictionary.h | 11 ++
be/src/vec/exec/format/parquet/parquet_common.cpp | 56 +++++++
be/src/vec/exec/format/parquet/parquet_common.h | 21 ++-
.../parquet/vparquet_column_chunk_reader.cpp | 4 +
.../format/parquet/vparquet_column_chunk_reader.h | 1 +
.../exec/format/parquet/vparquet_group_reader.cpp | 10 +-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 19 ++-
be/src/vec/exec/format/parquet/vparquet_reader.h | 4 +-
be/src/vec/exec/format/table/iceberg_reader.cpp | 184 ++++++++++++++++-----
be/src/vec/exec/format/table/iceberg_reader.h | 28 +++-
be/src/vec/exec/scan/vfile_scanner.cpp | 4 +-
.../planner/external/IcebergScanProvider.java | 69 --------
.../doris/planner/external/IcebergSplit.java | 2 -
gensrc/thrift/PlanNodes.thrift | 2 +
14 files changed, 274 insertions(+), 141 deletions(-)
diff --git a/be/src/vec/columns/column_dictionary.h b/be/src/vec/columns/column_dictionary.h
index 729957946f..422e2fdfbb 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -211,6 +211,17 @@ public:
LOG(FATAL) << "should not call replace_column_data_default in ColumnDictionary";
}
+ /**
+ * Just insert dictionary data items, the items will append into _dict.
+ */
+ void insert_many_dict_data(const StringRef* dict_array, uint32_t dict_num) {
+ _dict.reserve(_dict.size() + dict_num);
+ for (uint32_t i = 0; i < dict_num; ++i) {
+ auto value = StringValue(dict_array[i].data, dict_array[i].size);
+ _dict.insert_value(value);
+ }
+ }
+
void insert_many_dict_data(const int32_t* data_array, size_t start_index,
const StringRef* dict_array, size_t data_num,
uint32_t dict_num) override {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.cpp b/be/src/vec/exec/format/parquet/parquet_common.cpp
index 98f0a7de1a..5f8656fcd9 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.cpp
+++ b/be/src/vec/exec/format/parquet/parquet_common.cpp
@@ -18,6 +18,7 @@
#include "parquet_common.h"
#include "util/coding.h"
+#include "vec/columns/column_dictionary.h"
#include "vec/data_types/data_type_nullable.h"
namespace doris::vectorized {
@@ -277,6 +278,36 @@ void Decoder::init(FieldSchema* field_schema, cctz::time_zone* ctz) {
}
}
+Status Decoder::_decode_dict_values(MutableColumnPtr& doris_column,
+ ColumnSelectVector& select_vector) {
+ DCHECK(doris_column->is_column_dictionary());
+ size_t dict_index = 0;
+ ColumnSelectVector::DataReadType read_type;
+ auto& column_data = assert_cast<ColumnDictI32&>(*doris_column).get_data();
+ while (size_t run_length = select_vector.get_next_run(&read_type)) {
+ switch (read_type) {
+ case ColumnSelectVector::CONTENT: {
+ uint32_t* start_index = &_indexes[0];
+ column_data.insert(start_index + dict_index, start_index + dict_index + run_length);
+ dict_index += run_length;
+ break;
+ }
+ case ColumnSelectVector::NULL_DATA: {
+ doris_column->insert_many_defaults(run_length);
+ break;
+ }
+ case ColumnSelectVector::FILTERED_CONTENT: {
+ dict_index += run_length;
+ break;
+ }
+ case ColumnSelectVector::FILTERED_NULL: {
+ break;
+ }
+ }
+ }
+ return Status::OK();
+}
+
Status FixLengthDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
size_t num_values) {
if (num_values * _type_length != length) {
@@ -321,11 +352,26 @@ Status FixLengthDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (_has_dict) {
+ if (doris_column->is_column_dictionary() &&
+ assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
+ std::vector<StringRef> dict_items;
+ dict_items.reserve(_dict_items.size());
+ for (int i = 0; i < _dict_items.size(); ++i) {
+ dict_items.emplace_back(_dict_items[i], _type_length);
+ }
+ assert_cast<ColumnDictI32&>(*doris_column)
+ .insert_many_dict_data(&dict_items[0], dict_items.size());
+ }
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
} else if (UNLIKELY(_offset + _type_length * non_null_size > _data->size)) {
return Status::IOError("Out-of-bounds access in parquet data decoder");
}
+
+ if (doris_column->is_column_dictionary()) {
+ return _decode_dict_values(doris_column, select_vector);
+ }
+
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
#define DISPATCH(NUMERIC_TYPE, CPP_NUMERIC_TYPE) \
@@ -507,9 +553,19 @@ Status ByteArrayDecoder::decode_values(MutableColumnPtr& doris_column, DataTypeP
ColumnSelectVector& select_vector) {
size_t non_null_size = select_vector.num_values() - select_vector.num_nulls();
if (_has_dict) {
+ if (doris_column->is_column_dictionary() &&
+ assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
+ assert_cast<ColumnDictI32&>(*doris_column)
+ .insert_many_dict_data(&_dict_items[0], _dict_items.size());
+ }
_indexes.resize(non_null_size);
_index_batch_decoder->GetBatch(&_indexes[0], non_null_size);
}
+
+ if (doris_column->is_column_dictionary()) {
+ return _decode_dict_values(doris_column, select_vector);
+ }
+
TypeIndex logical_type = remove_nullable(data_type)->get_type_id();
switch (logical_type) {
case TypeIndex::String:
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index 112be6a223..a3d3e0d8a4 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -178,11 +178,23 @@ public:
}
protected:
+ /**
+ * Decode dictionary-coded values into doris_column, ensure that doris_column is ColumnDictI32 type,
+ * and the coded values must be read into _indexes previously.
+ */
+ Status _decode_dict_values(MutableColumnPtr& doris_column, ColumnSelectVector& select_vector);
+
int32_t _type_length;
Slice* _data = nullptr;
uint32_t _offset = 0;
FieldSchema* _field_schema = nullptr;
std::unique_ptr<DecodeParams> _decode_params = nullptr;
+
+ // For dictionary encoding
+ bool _has_dict = false;
+ std::unique_ptr<uint8_t[]> _dict = nullptr;
+ std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
+ std::vector<uint32_t> _indexes;
};
template <typename DecimalPrimitiveType>
@@ -253,12 +265,9 @@ protected:
if (!_has_dict) _offset += _type_length
tparquet::Type::type _physical_type;
+
// For dictionary encoding
- bool _has_dict = false;
- std::unique_ptr<uint8_t[]> _dict = nullptr;
std::vector<char*> _dict_items;
- std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
- std::vector<uint32_t> _indexes;
};
template <typename Numeric>
@@ -567,11 +576,7 @@ protected:
ColumnSelectVector& select_vector);
// For dictionary encoding
- bool _has_dict = false;
- std::unique_ptr<uint8_t[]> _dict = nullptr;
std::vector<StringRef> _dict_items;
- std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder = nullptr;
- std::vector<uint32_t> _indexes;
};
template <typename DecimalPrimitiveType>
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 2d398b5409..e1c2c0af64 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -176,6 +176,7 @@ Status ColumnChunkReader::_decode_dict_page() {
header.dictionary_page_header.num_values));
_decoders[static_cast<int>(tparquet::Encoding::RLE_DICTIONARY)] = std::move(page_decoder);
+ _has_dict = true;
return Status::OK();
}
@@ -218,6 +219,9 @@ size_t ColumnChunkReader::get_def_levels(level_t* levels, size_t n) {
Status ColumnChunkReader::decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type,
ColumnSelectVector& select_vector) {
SCOPED_RAW_TIMER(&_statistics.decode_value_time);
+ if (UNLIKELY(doris_column->is_column_dictionary() && !_has_dict)) {
+ return Status::IOError("Not dictionary coded");
+ }
if (UNLIKELY(_remaining_num_values < select_vector.num_values())) {
return Status::IOError("Decode too many values in current page");
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index f76c05b735..1bb1d017ce 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -168,6 +168,7 @@ private:
Slice _page_data;
std::unique_ptr<uint8_t[]> _decompress_buf;
size_t _decompress_buf_size = 0;
+ bool _has_dict = false;
Decoder* _page_decoder = nullptr;
// Map: encoding -> Decoder
// Plain or Dictionary encoding. If the dictionary grows too big, the encoding will fall back to the plain encoding
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index e3f841f1b1..0d7df17e4b 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -257,8 +257,14 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
// filter data in predicate columns, and remove filter column
if (select_vector.has_filter()) {
- Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
- origin_column_num);
+ if (block->columns() == origin_column_num) {
+ // the whole row group has been filtered by _lazy_read_ctx.vconjunct_ctx, and batch_eof is
+ // generated from next batch, so the filter column is removed ahead.
+ DCHECK_EQ(block->rows(), 0);
+ } else {
+ Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
+ origin_column_num);
+ }
} else {
Block::erase_useless_column(block, origin_column_num);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 6613b1f987..a27eddce71 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -141,7 +141,6 @@ Status ParquetReader::_open_file() {
_scan_range.file_size, 0, _file_reader));
}
if (_file_metadata == nullptr) {
- SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
RETURN_IF_ERROR(_file_reader->open());
if (_file_reader->size() == 0) {
return Status::EndOfFile("Empty Parquet File");
@@ -341,31 +340,31 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
}
Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
- if (_current_group_reader == nullptr) {
+ if (_current_group_reader == nullptr || _row_group_eof) {
if (_read_row_groups.size() > 0) {
RETURN_IF_ERROR(_next_row_group_reader());
} else {
+ _current_group_reader.reset(nullptr);
+ _row_group_eof = true;
*read_rows = 0;
*eof = true;
return Status::OK();
}
}
DCHECK(_current_group_reader != nullptr);
- bool batch_eof = false;
{
SCOPED_RAW_TIMER(&_statistics.column_read_time);
RETURN_IF_ERROR(
- _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof));
+ _current_group_reader->next_batch(block, _batch_size, read_rows, &_row_group_eof));
}
- if (batch_eof) {
+ if (_row_group_eof) {
auto column_st = _current_group_reader->statistics();
_column_statistics.merge(column_st);
_statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
- Status st = _next_row_group_reader();
- if (st.is_end_of_file()) {
+ if (_read_row_groups.size() == 0) {
*eof = true;
- } else if (!st.ok()) {
- return st;
+ } else {
+ *eof = false;
}
}
return Status::OK();
@@ -390,6 +389,7 @@ RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx(
Status ParquetReader::_next_row_group_reader() {
if (_read_row_groups.empty()) {
+ _row_group_eof = true;
_current_group_reader.reset(nullptr);
return Status::EndOfFile("No next RowGroupReader");
}
@@ -406,6 +406,7 @@ Status ParquetReader::_next_row_group_reader() {
_current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns,
row_group_index.row_group_id, row_group, _ctz,
position_delete_ctx, _lazy_read_ctx));
+ _row_group_eof = false;
return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges,
_col_offsets);
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 2bfc74f823..19a7b2534d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -154,7 +154,9 @@ private:
std::unique_ptr<FileReader> _file_reader = nullptr;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
- std::unique_ptr<RowGroupReader> _current_group_reader;
+ std::unique_ptr<RowGroupReader> _current_group_reader = nullptr;
+ // read to the end of current reader
+ bool _row_group_eof = true;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
std::map<std::string, int> _map_column; // column-name <---> column-index
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 332aa44e54..1bdae3a6e2 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -26,10 +26,16 @@ namespace doris::vectorized {
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
const std::string ICEBERG_ROW_POS = "pos";
+const std::string ICEBERG_FILE_PATH = "file_path";
IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
- RuntimeState* state, const TFileScanRangeParams& params)
- : TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
+ RuntimeState* state, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range)
+ : TableFormatReader(file_format_reader),
+ _profile(profile),
+ _state(state),
+ _params(params),
+ _range(range) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
@@ -38,12 +44,8 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
_iceberg_profile.delete_files_read_time =
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
-}
-
-IcebergTableReader::~IcebergTableReader() {
- if (_data_path_conjunct_ctx != nullptr) {
- _data_path_conjunct_ctx->close(_state);
- }
+ _iceberg_profile.delete_rows_sort_time =
+ ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
}
Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
@@ -76,15 +78,6 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
}
if (delete_file_type == POSITION_DELETE) {
// position delete
- SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
- auto row_desc = RowDescriptor(_state->desc_tbl(),
- std::vector<TupleId>({table_desc.delete_table_tuple_id}),
- std::vector<bool>({false}));
- RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct,
- &_data_path_conjunct_ctx));
- RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
- RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
-
ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
RowRange whole_range = parquet_reader->get_whole_range();
bool init_schema = false;
@@ -100,6 +93,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
delete_rows_iter++;
continue;
}
+ SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
std::vector<int64_t>& delete_rows = *delete_rows_iter;
TFileRangeDesc delete_range;
delete_range.path = delete_file.path;
@@ -112,49 +106,90 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
init_schema = true;
}
- RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr,
- _data_path_conjunct_ctx, false));
+ std::string data_file_path = _range.path;
+ // the path in _range is remove the namenode prefix,
+ // and the file_path in delete file is full path, so we should add it back.
+ if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+ std::string fs_name = _params.hdfs_params.fs_name;
+ if (!starts_with(data_file_path, fs_name)) {
+ data_file_path = fs_name + data_file_path;
+ }
+ }
+ RETURN_IF_ERROR(
+ delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false));
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> missing_columns;
delete_reader.set_fill_columns(partition_columns, missing_columns);
bool eof = false;
+ // We can only know whether a parquet file is encoded in dictionary after reading the first block,
+ // so we assume it dictionary encoded first, and reset it false if error thrown.
+ bool dictionary_coded = true;
while (!eof) {
Block block = Block();
for (int i = 0; i < delete_file_col_names.size(); ++i) {
DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
- delete_file_col_types[i], true);
- MutableColumnPtr data_column = data_type->create_column();
- block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
- delete_file_col_names[i]));
+ delete_file_col_types[i], false);
+ if (delete_file_col_names[i] == ICEBERG_FILE_PATH && dictionary_coded) {
+ // the dictionary data in ColumnDictI32 is referenced by StringValue, it does keep
+ // the dictionary data in its life circle, so the upper caller should keep the
+ // dictionary data alive after ColumnDictI32.
+ MutableColumnPtr dict_column = ColumnDictI32::create();
+ block.insert(ColumnWithTypeAndName(std::move(dict_column), data_type,
+ delete_file_col_names[i]));
+ } else {
+ MutableColumnPtr data_column = data_type->create_column();
+ block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+ delete_file_col_names[i]));
+ }
}
eof = false;
size_t read_rows = 0;
- RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+ Status st = delete_reader.get_next_block(&block, &read_rows, &eof);
+ if (!st.ok()) {
+ if (st.to_string() == "[IO_ERROR]Not dictionary coded") {
+ dictionary_coded = false;
+ continue;
+ }
+ return st;
+ }
if (read_rows > 0) {
- auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
- ColumnPtr pos_column = pos_type_column.column;
- using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
- if (pos_type_column.type->is_nullable()) {
- pos_column = assert_cast<const ColumnNullable&>(*pos_column)
- .get_nested_column_ptr();
+ ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
+ DCHECK_EQ(path_column->size(), read_rows);
+ std::pair<int, int> path_range;
+ if (dictionary_coded) {
+ path_range = _binary_search(assert_cast<const ColumnDictI32&>(*path_column),
+ data_file_path);
+ } else {
+ path_range = _binary_search(assert_cast<const ColumnString&>(*path_column),
+ data_file_path);
}
- const int64_t* src_data =
- assert_cast<const ColumnType&>(*pos_column).get_data().data();
- const int64_t* src_data_end = src_data + read_rows;
- const int64_t* cpy_start =
- std::lower_bound(src_data, src_data_end, whole_range.first_row);
- const int64_t* cpy_end =
- std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
- int64_t cpy_count = cpy_end - cpy_start;
-
- if (cpy_count > 0) {
- int64_t origin_size = delete_rows.size();
- delete_rows.resize(origin_size + cpy_count);
- int64_t* dest_position = &delete_rows[origin_size];
- memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
- num_delete_rows += cpy_count;
+
+ int skip_count = path_range.first;
+ int valid_count = path_range.second;
+ if (valid_count > 0) {
+ // delete position
+ ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+ CHECK_EQ(pos_column->size(), read_rows);
+ using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+ const int64_t* src_data =
+ assert_cast<const ColumnType&>(*pos_column).get_data().data() +
+ skip_count;
+ const int64_t* src_data_end = src_data + valid_count;
+ const int64_t* cpy_start =
+ std::lower_bound(src_data, src_data_end, whole_range.first_row);
+ const int64_t* cpy_end =
+ std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
+ int64_t cpy_count = cpy_end - cpy_start;
+
+ if (cpy_count > 0) {
+ int64_t origin_size = delete_rows.size();
+ delete_rows.resize(origin_size + cpy_count);
+ int64_t* dest_position = &delete_rows[origin_size];
+ memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
+ num_delete_rows += cpy_count;
+ }
}
}
}
@@ -168,6 +203,7 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
iter++;
}
}
+ SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
_merge_sort(delete_rows_list, num_delete_rows);
parquet_reader->set_delete_rows(&_delete_rows);
COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
@@ -178,6 +214,64 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
return Status::OK();
}
+std::pair<int, int> IcebergTableReader::_binary_search(const ColumnDictI32& file_path_column,
+ const std::string& data_file_path) {
+ size_t read_rows = file_path_column.get_data().size();
+
+ int data_file_code = file_path_column.find_code(StringValue(data_file_path));
+ if (data_file_code == -2) { // -1 is null code
+ return std::make_pair(read_rows, 0);
+ }
+
+ const int* coded_path = file_path_column.get_data().data();
+ const int* coded_path_end = coded_path + read_rows;
+ const int* path_start = std::lower_bound(coded_path, coded_path_end, data_file_code);
+ const int* path_end = std::lower_bound(path_start, coded_path_end, data_file_code + 1);
+ int skip_count = path_start - coded_path;
+ int valid_count = path_end - path_start;
+
+ return std::make_pair(skip_count, valid_count);
+}
+
+std::pair<int, int> IcebergTableReader::_binary_search(const ColumnString& file_path_column,
+ const std::string& data_file_path) {
+ const int read_rows = file_path_column.size();
+ if (read_rows == 0) {
+ return std::make_pair(0, 0);
+ }
+ StringRef data_file(data_file_path);
+
+ int left = 0;
+ int right = read_rows - 1;
+ if (file_path_column.get_data_at(left) > data_file ||
+ file_path_column.get_data_at(right) < data_file) {
+ return std::make_pair(read_rows, 0);
+ }
+ while (left < right) {
+ int mid = (left + right) / 2;
+ if (file_path_column.get_data_at(mid) < data_file) {
+ left = mid;
+ } else {
+ right = mid;
+ }
+ }
+ if (file_path_column.get_data_at(left) == data_file) {
+ int start = left;
+ int end = read_rows - 1;
+ while (start < end) {
+ int pivot = (start + end) / 2;
+ if (file_path_column.get_data_at(pivot) > data_file) {
+ end = pivot;
+ } else {
+ start = pivot;
+ }
+ }
+ return std::make_pair(left, end - left + 1);
+ } else {
+ return std::make_pair(read_rows, 0);
+ }
+}
+
void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
int64_t num_delete_rows) {
if (delete_rows_list.empty()) {
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index 0a9d4ef71f..d689fbcf51 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -20,6 +20,7 @@
#include <queue>
#include "table_format_reader.h"
+#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/format/parquet/parquet_common.h"
#include "vec/exprs/vexpr.h"
@@ -29,8 +30,9 @@ namespace doris::vectorized {
class IcebergTableReader : public TableFormatReader {
public:
IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
- RuntimeState* state, const TFileScanRangeParams& params);
- ~IcebergTableReader() override;
+ RuntimeState* state, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range);
+ ~IcebergTableReader() override = default;
Status init_row_filters(const TFileRangeDesc& range) override;
@@ -51,14 +53,34 @@ private:
RuntimeProfile::Counter* num_delete_files;
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
+ RuntimeProfile::Counter* delete_rows_sort_time;
};
+ /**
+ * https://iceberg.apache.org/spec/#position-delete-files
+ * The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
+ * Sorting by file_path allows filter pushdown by file in columnar storage formats.
+ * Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
+ *
+ * So, use merge-sort to merge delete rows from different files.
+ */
void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
+ /**
+ * Delete rows is sorted by file_path, using binary-search to locate the right delete rows for current data file.
+ * @return a pair of \<skip_count, valid_count\>,
+ * and the range of [skip_count, skip_count + valid_count) is the delete rows for current data file.
+ */
+ std::pair<int, int> _binary_search(const ColumnDictI32& file_path_column,
+ const std::string& data_file_path);
+
+ std::pair<int, int> _binary_search(const ColumnString& file_path_column,
+ const std::string& data_file_path);
+
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
- VExprContext* _data_path_conjunct_ctx = nullptr;
+ const TFileRangeDesc& _range;
IcebergProfile _iceberg_profile;
std::vector<int64_t> _delete_rows;
};
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index e675583bd3..4043174a4a 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -493,8 +493,8 @@ Status VFileScanner::_get_next_reader() {
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
IcebergTableReader* iceberg_reader = new IcebergTableReader(
- (GenericReader*)parquet_reader, _profile, _state, _params);
- iceberg_reader->init_row_filters(range);
+ (GenericReader*)parquet_reader, _profile, _state, _params, range);
+ RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
_cur_reader.reset((GenericReader*)parquet_reader);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
index e51b39144f..46adddfde6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergScanProvider.java
@@ -18,19 +18,9 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.BaseTableRef;
-import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.Expr;
-import org.apache.doris.analysis.SlotRef;
-import org.apache.doris.analysis.StringLiteral;
-import org.apache.doris.analysis.TableName;
-import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.HMSResource;
-import org.apache.doris.catalog.PrimitiveType;
-import org.apache.doris.catalog.TableIf;
-import org.apache.doris.catalog.external.ExternalTable;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
@@ -41,12 +31,8 @@ import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TIcebergDeleteFileDesc;
import org.apache.doris.thrift.TIcebergFileDesc;
-import org.apache.doris.thrift.TIcebergTable;
-import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableFormatFileDesc;
-import org.apache.doris.thrift.TTableType;
-import lombok.Data;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
@@ -76,10 +62,6 @@ import java.util.OptionalLong;
public class IcebergScanProvider extends HiveScanProvider {
private static final int MIN_DELETE_FILE_SUPPORT_VERSION = 2;
- public static final String V2_DELETE_TBL = "iceberg#delete#tbl";
- public static final String V2_DELETE_DB = "iceberg#delete#db";
- private static final DeleteFileTempTable scanDeleteTable =
- new DeleteFileTempTable(TableIf.TableType.HMS_EXTERNAL_TABLE);
private final Analyzer analyzer;
public IcebergScanProvider(HMSExternalTable hmsTable, Analyzer analyzer, TupleDescriptor desc,
@@ -98,7 +80,6 @@ public class IcebergScanProvider extends HiveScanProvider {
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
- setPathSelectConjunct(fileDesc, icebergSplit);
for (IcebergDeleteFileFilter filter : icebergSplit.getDeleteFileFilters()) {
TIcebergDeleteFileDesc deleteFileDesc = new TIcebergDeleteFileDesc();
deleteFileDesc.setPath(filter.getDeleteFilePath());
@@ -127,19 +108,6 @@ public class IcebergScanProvider extends HiveScanProvider {
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
- private static void setPathSelectConjunct(TIcebergFileDesc fileDesc, IcebergSplit icebergSplit)
- throws UserException {
- BaseTableRef tableRef = icebergSplit.getDeleteTableRef();
- fileDesc.setDeleteTableTupleId(tableRef.getDesc().getId().asInt());
- SlotRef lhs = new SlotRef(tableRef.getName(), DeleteFileTempTable.DATA_FILE_PATH);
- lhs.analyze(icebergSplit.getAnalyzer());
- lhs.getDesc().setIsMaterialized(true);
- StringLiteral rhs = new StringLiteral(icebergSplit.getPath().toUri().toString());
- BinaryPredicate pathSelectConjunct = new BinaryPredicate(BinaryPredicate.Operator.EQ, lhs, rhs);
- pathSelectConjunct.analyze(icebergSplit.getAnalyzer());
- fileDesc.setFileSelectConjunct(pathSelectConjunct.treeToThrift());
- }
-
@Override
public TFileFormatType getFileFormatType() throws DdlException, MetaNotFoundException {
TFileFormatType type;
@@ -173,14 +141,6 @@ public class IcebergScanProvider extends HiveScanProvider {
}
List<InputSplit> splits = new ArrayList<>();
int formatVersion = ((BaseTable) table).operations().current().formatVersion();
- BaseTableRef tableRef = null;
- if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
- TableName fullName = analyzer.getFqTableName(scanDeleteTable.getTableName());
- fullName.analyze(analyzer);
- TableRef ref = new TableRef(fullName, fullName.toString(), null);
- tableRef = new BaseTableRef(ref, scanDeleteTable, scanDeleteTable.getTableName());
- tableRef.analyze(analyzer);
- }
for (FileScanTask task : scan.planFiles()) {
for (FileScanTask spitTask : task.split(128 * 1024 * 1024)) {
String dataFilePath = spitTask.file().path().toString();
@@ -189,7 +149,6 @@ public class IcebergScanProvider extends HiveScanProvider {
split.setFormatVersion(formatVersion);
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(spitTask));
- split.setDeleteTableRef(tableRef);
}
split.setTableFormatType(TableFormatType.ICEBERG);
split.setAnalyzer(analyzer);
@@ -239,32 +198,4 @@ public class IcebergScanProvider extends HiveScanProvider {
public List<String> getPathPartitionKeys() throws DdlException, MetaNotFoundException {
return Collections.emptyList();
}
-
- @Data
- static class DeleteFileTempTable extends ExternalTable {
- public static final String DATA_FILE_PATH = "file_path";
- private final TableName tableName;
- private final List<Column> fullSchema = new ArrayList<>();
-
- public DeleteFileTempTable(TableType type) {
- super(0, V2_DELETE_TBL, null, V2_DELETE_DB, type);
- this.tableName = new TableName(null, V2_DELETE_DB, V2_DELETE_TBL);
- Column dataFilePathCol = new Column(DATA_FILE_PATH, PrimitiveType.STRING, true);
- this.fullSchema.add(dataFilePathCol);
- }
-
- @Override
- public List<Column> getFullSchema() {
- return fullSchema;
- }
-
- @Override
- public TTableDescriptor toThrift() {
- TIcebergTable tIcebergTable = new TIcebergTable(V2_DELETE_DB, V2_DELETE_TBL, new HashMap<>());
- TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ICEBERG_TABLE,
- fullSchema.size(), 0, getName(), "");
- tTableDescriptor.setIcebergTable(tIcebergTable);
- return tTableDescriptor;
- }
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
index 14fce6caf8..b9607a7f00 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/IcebergSplit.java
@@ -18,7 +18,6 @@
package org.apache.doris.planner.external;
import org.apache.doris.analysis.Analyzer;
-import org.apache.doris.analysis.BaseTableRef;
import lombok.Data;
import org.apache.hadoop.fs.Path;
@@ -34,7 +33,6 @@ public class IcebergSplit extends HiveSplit {
private Analyzer analyzer;
private String dataFilePath;
private Integer formatVersion;
- private BaseTableRef deleteTableRef;
private List<IcebergDeleteFileFilter> deleteFileFilters;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 613c884a06..ff97361c6e 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -274,7 +274,9 @@ struct TIcebergFileDesc {
2: optional i32 content;
// When open a delete file, filter the data file path with the 'file_path' property
3: optional list<TIcebergDeleteFileDesc> delete_files;
+ // Deprecated
4: optional Types.TTupleId delete_table_tuple_id;
+ // Deprecated
5: optional Exprs.TExpr file_select_conjunct;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org