You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/08 14:14:59 UTC
[doris] branch branch-1.2-lts updated: [opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new 48a097b040 [opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)
48a097b040 is described below
commit 48a097b040bda777dcf49ac30639f9fb8a5fde3f
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sat Jan 7 22:29:11 2023 +0800
[opt](multi-catalog) cache and reuse position delete rows in iceberg v2 (#15670)
A deleted file may belong to multiple data files. Each data file will read a full amount of deleted files,
so a deleted file may be read repeatedly. The deleted files can be cached, and multiple data files
can reuse the first read content.
The performance is improved by 60% in the case of single thread, and by 30% in the case of multithreading.
---
be/src/vec/exec/format/format_common.h | 60 +++++
be/src/vec/exec/format/table/iceberg_reader.cpp | 345 +++++++++++++-----------
be/src/vec/exec/format/table/iceberg_reader.h | 26 +-
be/src/vec/exec/scan/new_file_scan_node.cpp | 4 +-
be/src/vec/exec/scan/new_file_scan_node.h | 2 +
be/src/vec/exec/scan/vfile_scanner.cpp | 10 +-
be/src/vec/exec/scan/vfile_scanner.h | 6 +-
7 files changed, 275 insertions(+), 178 deletions(-)
diff --git a/be/src/vec/exec/format/format_common.h b/be/src/vec/exec/format/format_common.h
index c5570f967d..18277c1ed4 100644
--- a/be/src/vec/exec/format/format_common.h
+++ b/be/src/vec/exec/format/format_common.h
@@ -47,4 +47,64 @@ struct DecimalScaleParams {
}
};
+/**
+ * Key-Value Cache Helper.
+ *
+ * It store a object instance global. User can invoke get method by key and a
+ * object creator callback. If there is a instance stored in cache, then it will
+ * return a void pointer of it, otherwise, it will invoke creator callback, create
+ * a new instance store global, and return it.
+ *
+ * The stored objects will be deleted when deconstructing, so user do not need to
+ * delete the returned pointer.
+ *
+ * User can invoke erase method by key to delete data.
+ *
+ * @tparam KType is the key type
+ */
+template <typename KType>
+class KVCache {
+public:
+ KVCache() = default;
+
+ ~KVCache() {
+ for (auto& kv : _storage) {
+ _delete_fn[kv.first](kv.second);
+ }
+ }
+
+ void erase(const KType& key) {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto it = _storage.find(key);
+ if (it != _storage.end()) {
+ _delete_fn[key](_storage[key]);
+ _storage.erase(key);
+ _delete_fn.erase(key);
+ }
+ }
+
+ template <class T>
+ T* get(const KType& key, const std::function<T*()> createIfNotExists) {
+ std::lock_guard<std::mutex> lock(_lock);
+ auto it = _storage.find(key);
+ if (it != _storage.end()) {
+ return reinterpret_cast<T*>(it->second);
+ } else {
+ T* rawPtr = createIfNotExists();
+ if (rawPtr != nullptr) {
+ _delete_fn[key] = [](void* obj) { delete reinterpret_cast<T*>(obj); };
+ _storage[key] = rawPtr;
+ }
+ return rawPtr;
+ }
+ }
+
+private:
+ using DeleteFn = void (*)(void*);
+
+ std::mutex _lock;
+ std::unordered_map<KType, DeleteFn> _delete_fn;
+ std::unordered_map<KType, void*> _storage;
+};
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index a4e5bbd618..f506597fc4 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -24,18 +24,25 @@
namespace doris::vectorized {
+using DeleteRows = std::vector<int64_t>;
+using DeleteFile = phmap::parallel_flat_hash_map<
+ std::string, std::unique_ptr<DeleteRows>, std::hash<std::string>,
+ std::equal_to<std::string>,
+ std::allocator<std::pair<const std::string, std::unique_ptr<DeleteRows>>>, 8, std::mutex>;
+
const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
const std::string ICEBERG_ROW_POS = "pos";
const std::string ICEBERG_FILE_PATH = "file_path";
IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range)
+ const TFileRangeDesc& range, KVCache<std::string>& kv_cache)
: TableFormatReader(file_format_reader),
_profile(profile),
_state(state),
_params(params),
- _range(range) {
+ _range(range),
+ _kv_cache(kv_cache) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
@@ -72,29 +79,49 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
return Status::OK();
}
auto& delete_file_type = table_desc.content;
- auto files = table_desc.delete_files;
+ const std::vector<TIcebergDeleteFileDesc>& files = table_desc.delete_files;
if (files.empty()) {
return Status::OK();
}
if (delete_file_type == POSITION_DELETE) {
- // position delete
- ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
- RowRange whole_range = parquet_reader->get_whole_range();
- bool init_schema = false;
- std::vector<std::string> delete_file_col_names;
- std::vector<TypeDescriptor> delete_file_col_types;
- std::list<std::vector<int64_t>> delete_rows_list;
- delete_rows_list.resize(files.size());
- int64_t num_delete_rows = 0;
- auto delete_rows_iter = delete_rows_list.begin();
- for (auto& delete_file : files) {
- if (whole_range.last_row <= delete_file.position_lower_bound ||
- whole_range.first_row > delete_file.position_upper_bound) {
- delete_rows_iter++;
- continue;
- }
- SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
- std::vector<int64_t>& delete_rows = *delete_rows_iter;
+ RETURN_IF_ERROR(_position_delete(files));
+ }
+ // todo: equality delete
+ COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+ return Status::OK();
+}
+
+Status IcebergTableReader::_position_delete(
+ const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ std::string data_file_path = _range.path;
+ // the path in _range is remove the namenode prefix,
+ // and the file_path in delete file is full path, so we should add it back.
+ if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
+ std::string fs_name = _params.hdfs_params.fs_name;
+ if (!starts_with(data_file_path, fs_name)) {
+ data_file_path = fs_name + data_file_path;
+ }
+ }
+
+ // position delete
+ ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
+ RowRange whole_range = parquet_reader->get_whole_range();
+ bool init_schema = false;
+ std::vector<std::string> delete_file_col_names;
+ std::vector<TypeDescriptor> delete_file_col_types;
+ std::vector<DeleteRows*> delete_rows_array;
+ int64_t num_delete_rows = 0;
+ std::vector<DeleteFile*> erase_data;
+ for (auto& delete_file : delete_files) {
+ if (whole_range.last_row <= delete_file.position_lower_bound ||
+ whole_range.first_row > delete_file.position_upper_bound) {
+ continue;
+ }
+
+ SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
+ Status create_status = Status::OK();
+ DeleteFile* delete_file_cache = _kv_cache.get<
+ DeleteFile>(delete_file.path, [&]() -> DeleteFile* {
TFileRangeDesc delete_range;
delete_range.path = delete_file.path;
delete_range.start_offset = 0;
@@ -106,41 +133,34 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
init_schema = true;
}
- std::string data_file_path = _range.path;
- // the path in _range is remove the namenode prefix,
- // and the file_path in delete file is full path, so we should add it back.
- if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
- std::string fs_name = _params.hdfs_params.fs_name;
- if (!starts_with(data_file_path, fs_name)) {
- data_file_path = fs_name + data_file_path;
- }
- }
- Status init_st =
+ create_status =
delete_reader.init_reader(delete_file_col_names, nullptr, nullptr, false);
- if (init_st.is_end_of_file()) {
- continue;
- } else if (!init_st.ok()) {
- return init_st;
+ if (!create_status.ok()) {
+ return nullptr;
}
+
std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
partition_columns;
std::unordered_map<std::string, VExprContext*> missing_columns;
delete_reader.set_fill_columns(partition_columns, missing_columns);
- bool dictionary_coded = false;
+ bool dictionary_coded = true;
const tparquet::FileMetaData* meta_data = delete_reader.get_meta_data();
for (int i = 0; i < delete_file_col_names.size(); ++i) {
if (delete_file_col_names[i] == ICEBERG_FILE_PATH) {
- // ParquetReader wil return EndOfFile if there's no row group
- auto& column_chunk = meta_data->row_groups[0].columns[i];
- if (column_chunk.__isset.meta_data &&
- column_chunk.meta_data.__isset.dictionary_page_offset) {
- dictionary_coded = true;
+ for (int j = 0; j < meta_data->row_groups.size(); ++j) {
+ auto& column_chunk = meta_data->row_groups[j].columns[i];
+ if (!(column_chunk.__isset.meta_data &&
+ column_chunk.meta_data.__isset.dictionary_page_offset)) {
+ dictionary_coded = false;
+ break;
+ }
}
break;
}
}
+ DeleteFile* position_delete = new DeleteFile;
bool eof = false;
while (!eof) {
Block block = Block();
@@ -162,161 +182,170 @@ Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
}
eof = false;
size_t read_rows = 0;
- RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+ create_status = delete_reader.get_next_block(&block, &read_rows, &eof);
+ if (!create_status.ok()) {
+ return nullptr;
+ }
if (read_rows > 0) {
ColumnPtr path_column = block.get_by_name(ICEBERG_FILE_PATH).column;
DCHECK_EQ(path_column->size(), read_rows);
- std::pair<int, int> path_range;
+ ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
+ using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+ const int64_t* src_data =
+ assert_cast<const ColumnType&>(*pos_column).get_data().data();
+ IcebergTableReader::PositionDeleteRange range;
if (dictionary_coded) {
- path_range = _binary_search(assert_cast<const ColumnDictI32&>(*path_column),
- data_file_path);
+ range = _get_range(assert_cast<const ColumnDictI32&>(*path_column));
} else {
- path_range = _binary_search(assert_cast<const ColumnString&>(*path_column),
- data_file_path);
+ range = _get_range(assert_cast<const ColumnString&>(*path_column));
}
-
- int skip_count = path_range.first;
- int valid_count = path_range.second;
- if (valid_count > 0) {
- // delete position
- ColumnPtr pos_column = block.get_by_name(ICEBERG_ROW_POS).column;
- CHECK_EQ(pos_column->size(), read_rows);
- using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
- const int64_t* src_data =
- assert_cast<const ColumnType&>(*pos_column).get_data().data() +
- skip_count;
- const int64_t* src_data_end = src_data + valid_count;
- const int64_t* cpy_start =
- std::lower_bound(src_data, src_data_end, whole_range.first_row);
- const int64_t* cpy_end =
- std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
- int64_t cpy_count = cpy_end - cpy_start;
-
- if (cpy_count > 0) {
- int64_t origin_size = delete_rows.size();
- delete_rows.resize(origin_size + cpy_count);
- int64_t* dest_position = &delete_rows[origin_size];
- memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
- num_delete_rows += cpy_count;
+ for (int i = 0; i < range.range.size(); ++i) {
+ std::string key = range.data_file_path[i];
+ auto iter = position_delete->find(key);
+ DeleteRows* delete_rows;
+ if (iter == position_delete->end()) {
+ delete_rows = new DeleteRows;
+ std::unique_ptr<DeleteRows> delete_rows_ptr(delete_rows);
+ (*position_delete)[key] = std::move(delete_rows_ptr);
+ } else {
+ delete_rows = iter->second.get();
}
+ const int64_t* cpy_start = src_data + range.range[i].first;
+ const int64_t cpy_count = range.range[i].second - range.range[i].first;
+ int64_t origin_size = delete_rows->size();
+ delete_rows->resize(origin_size + cpy_count);
+ int64_t* dest_position = &(*delete_rows)[origin_size];
+ memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
}
}
}
- delete_rows_iter++;
+ return position_delete;
+ });
+ if (create_status.is_end_of_file()) {
+ continue;
+ } else if (!create_status.ok()) {
+ return create_status;
}
- if (num_delete_rows > 0) {
- for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end();) {
- if (iter->empty()) {
- delete_rows_list.erase(iter++);
- } else {
- iter++;
+
+ DeleteFile& delete_file_map = *((DeleteFile*)delete_file_cache);
+ auto get_value = [&](const auto& v) {
+ DeleteRows* row_ids;
+ // remove those compatibility codes when we finish upgrade phmap.
+ if constexpr (std::is_same_v<const typename DeleteFile::mapped_type&, decltype(v)>) {
+ row_ids = v.get();
+ } else {
+ row_ids = v.second.get();
+ }
+ if (row_ids->size() > 0) {
+ delete_rows_array.emplace_back(row_ids);
+ num_delete_rows += row_ids->size();
+ if (row_ids->front() >= whole_range.first_row &&
+ row_ids->back() < whole_range.last_row) {
+ erase_data.emplace_back(delete_file_cache);
}
}
- SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
- _merge_sort(delete_rows_list, num_delete_rows);
- parquet_reader->set_delete_rows(&_delete_rows);
- COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
- }
+ };
+ delete_file_map.if_contains(data_file_path, get_value);
+ }
+ if (num_delete_rows > 0) {
+ SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
+ _sort_delete_rows(delete_rows_array, num_delete_rows);
+ parquet_reader->set_delete_rows(&_delete_rows);
+ COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
+ }
+ // the delete rows are copy out, we can erase them.
+ for (auto& erase_item : erase_data) {
+ erase_item->erase(data_file_path);
}
- // todo: equality delete
- COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
return Status::OK();
}
-std::pair<int, int> IcebergTableReader::_binary_search(const ColumnDictI32& file_path_column,
- const std::string& data_file_path) {
- size_t read_rows = file_path_column.get_data().size();
-
- int data_file_code = file_path_column.find_code(StringValue(data_file_path));
- if (data_file_code == -2) { // -1 is null code
- return std::make_pair(read_rows, 0);
+IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
+ const ColumnDictI32& file_path_column) {
+ IcebergTableReader::PositionDeleteRange range;
+ int read_rows = file_path_column.get_data().size();
+ int* code_path = const_cast<int*>(file_path_column.get_data().data());
+ int* code_path_start = code_path;
+ int* code_path_end = code_path + read_rows;
+ while (code_path < code_path_end) {
+ int code = code_path[0];
+ int* code_end = std::upper_bound(code_path, code_path_end, code);
+ range.data_file_path.emplace_back(file_path_column.get_value(code).to_string());
+ range.range.emplace_back(
+ std::make_pair(code_path - code_path_start, code_end - code_path_start));
+ code_path = code_end;
}
-
- const int* coded_path = file_path_column.get_data().data();
- const int* coded_path_end = coded_path + read_rows;
- const int* path_start = std::lower_bound(coded_path, coded_path_end, data_file_code);
- const int* path_end = std::lower_bound(path_start, coded_path_end, data_file_code + 1);
- int skip_count = path_start - coded_path;
- int valid_count = path_end - path_start;
-
- return std::make_pair(skip_count, valid_count);
+ return range;
}
-std::pair<int, int> IcebergTableReader::_binary_search(const ColumnString& file_path_column,
- const std::string& data_file_path) {
- const int read_rows = file_path_column.size();
- if (read_rows == 0) {
- return std::make_pair(0, 0);
- }
- StringRef data_file(data_file_path);
-
- int left = 0;
- int right = read_rows - 1;
- if (file_path_column.get_data_at(left) > data_file ||
- file_path_column.get_data_at(right) < data_file) {
- return std::make_pair(read_rows, 0);
- }
- while (left < right) {
- int mid = (left + right) / 2;
- if (file_path_column.get_data_at(mid) < data_file) {
- left = mid;
- } else {
- right = mid;
- }
- }
- if (file_path_column.get_data_at(left) == data_file) {
- int start = left;
- int end = read_rows - 1;
- while (start < end) {
- int pivot = (start + end) / 2;
- if (file_path_column.get_data_at(pivot) > data_file) {
- end = pivot;
+IcebergTableReader::PositionDeleteRange IcebergTableReader::_get_range(
+ const ColumnString& file_path_column) {
+ IcebergTableReader::PositionDeleteRange range;
+ int read_rows = file_path_column.size();
+ int index = 0;
+ while (index < read_rows) {
+ StringRef data_path = file_path_column.get_data_at(index);
+ int left = index;
+ int right = read_rows - 1;
+ while (left < right) {
+ int mid = left + (right - left) / 2;
+ if (file_path_column.get_data_at(mid) > data_path) {
+ right = mid;
} else {
- start = pivot;
+ left = mid;
}
}
- return std::make_pair(left, end - left + 1);
- } else {
- return std::make_pair(read_rows, 0);
+ range.data_file_path.emplace_back(data_path.to_string());
+ range.range.emplace_back(std::make_pair(index, left + 1));
+ index = left + 1;
}
+ return range;
}
-void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
- int64_t num_delete_rows) {
- if (delete_rows_list.empty()) {
+void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& delete_rows_array,
+ int64_t num_delete_rows) {
+ if (delete_rows_array.empty()) {
return;
}
- if (delete_rows_list.size() == 1) {
+ if (delete_rows_array.size() == 1) {
_delete_rows.resize(num_delete_rows);
- memcpy(&_delete_rows[0], &(delete_rows_list.front()[0]), sizeof(int64_t) * num_delete_rows);
+ memcpy(&_delete_rows[0], &((*delete_rows_array.front())[0]),
+ sizeof(int64_t) * num_delete_rows);
return;
}
- if (delete_rows_list.size() == 2) {
+ if (delete_rows_array.size() == 2) {
_delete_rows.resize(num_delete_rows);
- std::merge(delete_rows_list.front().begin(), delete_rows_list.front().end(),
- delete_rows_list.back().begin(), delete_rows_list.back().end(),
+ std::merge(delete_rows_array.front()->begin(), delete_rows_array.front()->end(),
+ delete_rows_array.back()->begin(), delete_rows_array.back()->end(),
_delete_rows.begin());
return;
}
- // merge sort
- using vec_pair =
- std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::const_iterator>;
- auto cmp = [](const vec_pair& a, const vec_pair& b) { return *a.first > *b.first; };
- std::priority_queue<vec_pair, vector<vec_pair>, decltype(cmp)> pq(cmp);
- for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end(); ++iter) {
- if (iter->size() > 0) {
- pq.push({iter->begin(), iter->end()});
+ using vec_pair = std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::iterator>;
+ _delete_rows.resize(num_delete_rows);
+ auto row_id_iter = _delete_rows.begin();
+ auto iter_end = _delete_rows.end();
+ std::vector<vec_pair> rows_array;
+ for (auto rows : delete_rows_array) {
+ if (rows->size() > 0) {
+ rows_array.push_back({rows->begin(), rows->end()});
}
}
- _delete_rows.reserve(num_delete_rows);
- while (!pq.empty()) {
- vec_pair p = pq.top();
- pq.pop();
- _delete_rows.emplace_back(*p.first);
- p.first++;
- if (p.first != p.second) {
- pq.push(p);
+ int array_size = rows_array.size();
+ while (row_id_iter != iter_end) {
+ int min_index = 0;
+ int min = *rows_array[0].first;
+ for (int i = 0; i < array_size; ++i) {
+ if (*rows_array[i].first < min) {
+ min_index = i;
+ min = *rows_array[i].first;
+ }
+ }
+ *row_id_iter++ = min;
+ rows_array[min_index].first++;
+ if (UNLIKELY(rows_array[min_index].first == rows_array[min_index].second)) {
+ rows_array.erase(rows_array.begin() + min_index);
+ array_size--;
}
}
}
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index d689fbcf51..301546b6df 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -29,9 +29,14 @@ namespace doris::vectorized {
class IcebergTableReader : public TableFormatReader {
public:
+ struct PositionDeleteRange {
+ std::vector<std::string> data_file_path;
+ std::vector<std::pair<int, int>> range;
+ };
+
IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range);
+ const TFileRangeDesc& range, KVCache<std::string>& kv_cache);
~IcebergTableReader() override = default;
Status init_row_filters(const TFileRangeDesc& range) override;
@@ -56,31 +61,26 @@ private:
RuntimeProfile::Counter* delete_rows_sort_time;
};
+ Status _position_delete(const std::vector<TIcebergDeleteFileDesc>& delete_files);
+
/**
* https://iceberg.apache.org/spec/#position-delete-files
* The rows in the delete file must be sorted by file_path then position to optimize filtering rows while scanning.
* Sorting by file_path allows filter pushdown by file in columnar storage formats.
* Sorting by position allows filtering rows while scanning, to avoid keeping deletes in memory.
- *
- * So, use merge-sort to merge delete rows from different files.
*/
- void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
+ void _sort_delete_rows(std::vector<std::vector<int64_t>*>& delete_rows_array,
+ int64_t num_delete_rows);
- /**
- * Delete rows is sorted by file_path, using binary-search to locate the right delete rows for current data file.
- * @return a pair of \<skip_count, valid_count\>,
- * and the range of [skip_count, skip_count + valid_count) is the delete rows for current data file.
- */
- std::pair<int, int> _binary_search(const ColumnDictI32& file_path_column,
- const std::string& data_file_path);
+ PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
- std::pair<int, int> _binary_search(const ColumnString& file_path_column,
- const std::string& data_file_path);
+ PositionDeleteRange _get_range(const ColumnString& file_path_column);
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
+ KVCache<std::string>& _kv_cache;
IcebergProfile _iceberg_profile;
std::vector<int64_t> _delete_rows;
};
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 49319f1e6b..59d3ca8453 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -100,8 +100,8 @@ Status NewFileScanNode::_init_scanners(std::list<VScanner*>* scanners) {
}
VScanner* NewFileScanNode::_create_scanner(const TFileScanRange& scan_range) {
- VScanner* scanner =
- new VFileScanner(_state, this, _limit_per_scanner, scan_range, runtime_profile());
+ VScanner* scanner = new VFileScanner(_state, this, _limit_per_scanner, scan_range,
+ runtime_profile(), _kv_cache);
((VFileScanner*)scanner)->prepare(_vconjunct_ctx_ptr.get(), &_colname_to_value_range);
_scanner_pool.add(scanner);
// TODO: Can we remove _conjunct_ctxs and use _vconjunct_ctx_ptr instead?
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h
index 53b11e408d..26eed2310b 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -17,6 +17,7 @@
#pragma once
+#include "vec/exec/format/format_common.h"
#include "vec/exec/scan/vscan_node.h"
namespace doris::vectorized {
@@ -41,5 +42,6 @@ private:
private:
std::vector<TScanRangeParams> _scan_ranges;
+ KVCache<std::string> _kv_cache;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp
index 4043174a4a..d0492ea9c7 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -22,7 +22,6 @@
#include <vec/data_types/data_type_factory.hpp>
-#include "../format/table/iceberg_reader.h"
#include "common/logging.h"
#include "common/utils.h"
#include "exec/arrow/orc_reader.h"
@@ -42,7 +41,8 @@
namespace doris::vectorized {
VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile)
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ KVCache<std::string>& kv_cache)
: VScanner(state, static_cast<VScanNode*>(parent), limit),
_params(scan_range.params),
_ranges(scan_range.ranges),
@@ -51,6 +51,7 @@ VFileScanner::VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t
_cur_reader_eof(false),
_mem_pool(std::make_unique<MemPool>()),
_profile(profile),
+ _kv_cache(kv_cache),
_strict_mode(false) {
if (scan_range.params.__isset.strict_mode) {
_strict_mode = scan_range.params.strict_mode;
@@ -492,8 +493,9 @@ Status VFileScanner::_get_next_reader() {
_push_down_expr);
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
- IcebergTableReader* iceberg_reader = new IcebergTableReader(
- (GenericReader*)parquet_reader, _profile, _state, _params, range);
+ IcebergTableReader* iceberg_reader =
+ new IcebergTableReader((GenericReader*)parquet_reader, _profile, _state,
+ _params, range, _kv_cache);
RETURN_IF_ERROR(iceberg_reader->init_row_filters(range));
_cur_reader.reset((GenericReader*)iceberg_reader);
} else {
diff --git a/be/src/vec/exec/scan/vfile_scanner.h b/be/src/vec/exec/scan/vfile_scanner.h
index 6ac802f60c..28c0e3d347 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -23,6 +23,7 @@
#include "exprs/function_filter.h"
#include "io/file_factory.h"
#include "runtime/tuple.h"
+#include "vec/exec/format/format_common.h"
#include "vec/exec/format/generic_reader.h"
#include "vec/exec/scan/vscanner.h"
@@ -33,7 +34,8 @@ class NewFileScanNode;
class VFileScanner : public VScanner {
public:
VFileScanner(RuntimeState* state, NewFileScanNode* parent, int64_t limit,
- const TFileScanRange& scan_range, RuntimeProfile* profile);
+ const TFileScanRange& scan_range, RuntimeProfile* profile,
+ KVCache<string>& kv_cache);
Status open(RuntimeState* state) override;
@@ -107,6 +109,8 @@ protected:
// Profile
RuntimeProfile* _profile;
+ KVCache<std::string>& _kv_cache;
+
bool _scanner_eof = false;
int _rows = 0;
int _num_of_columns_from_file;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org