You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/24 09:10:06 UTC
[doris] 14/15: [fix](multi-catalog) fix and optimize iceberg v2 reader (#15274)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit f251343fa42878e4e4454723c8d92924eca381ea
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Sat Dec 24 16:02:07 2022 +0800
[fix](multi-catalog) fix and optimize iceberg v2 reader (#15274)
Fix three bugs when read iceberg v2 tables:
1. The `delete position` in `delete file` represents the position of delete row in the entire file, but the `read range` in
`RowGroupReader` represents the position in current row group. Therefore, we need to subtract the position of first
row of current row group from `delete position`.
2. When only reading the partition columns, `RowGroupReader` skips processing the `delete position`.
3. If the `delete position` has delete all rows in a row group, the `read range` is empty, but we read the whole row
group in such case.
Optimize four performance issues:
1. We change `delete position` to `delete range`, and then merge `delete range` and `read range` into the final read
ranges. This process is too tedious and time-consuming. . we can merge `delete position` and `read range` directly.
2. `delete position` is ordered in a `delete file`, so we can use merge-sort, instead of ordered-set.
3. Initialize `RowGroupReader` when reading, instead of initialize all row groups when opening a `ParquetReader`, to
save memory usage, and the same as `IcebergReader`.
4. Change the recursive call of `_do_lazy_read` to loop logic.
---
be/src/service/internal_service.cpp | 2 +-
be/src/vec/exec/format/csv/csv_reader.cpp | 4 +-
be/src/vec/exec/format/csv/csv_reader.h | 4 +-
be/src/vec/exec/format/generic_reader.h | 6 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 4 +-
be/src/vec/exec/format/json/new_json_reader.h | 4 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 4 +-
be/src/vec/exec/format/orc/vorc_reader.h | 4 +-
be/src/vec/exec/format/parquet/parquet_common.h | 2 +-
.../exec/format/parquet/vparquet_column_reader.cpp | 30 +--
.../exec/format/parquet/vparquet_column_reader.h | 19 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 199 +++++++++-----
.../exec/format/parquet/vparquet_group_reader.h | 83 ++++--
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 243 +++++++----------
be/src/vec/exec/format/parquet/vparquet_reader.h | 30 ++-
be/src/vec/exec/format/table/iceberg_reader.cpp | 293 ++++++++++-----------
be/src/vec/exec/format/table/iceberg_reader.h | 30 +--
be/src/vec/exec/format/table/table_format_reader.h | 5 +-
be/test/vec/exec/parquet/parquet_thrift_test.cpp | 12 +-
19 files changed, 512 insertions(+), 466 deletions(-)
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 28b9b8f4ca..381782af85 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -478,7 +478,7 @@ void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* c
}
std::vector<std::string> col_names;
std::vector<TypeDescriptor> col_types;
- st = reader->get_parsered_schema(&col_names, &col_types);
+ st = reader->get_parsed_schema(&col_names, &col_types);
if (!st.ok()) {
LOG(WARNING) << "fetch table schema failed, errmsg=" << st.get_error_msg();
st.to_protobuf(result->mutable_status());
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp b/be/src/vec/exec/format/csv/csv_reader.cpp
index 2e710ca44f..48f8d84dc7 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -202,8 +202,8 @@ Status CsvReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* n
return Status::OK();
}
-Status CsvReader::get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) {
+Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
size_t read_line = 0;
bool is_parse_name = false;
RETURN_IF_ERROR(_prepare_parse(&read_line, &is_parse_name));
diff --git a/be/src/vec/exec/format/csv/csv_reader.h b/be/src/vec/exec/format/csv/csv_reader.h
index 5083c00d2d..5bb14523e3 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -49,8 +49,8 @@ public:
// 1. header_type is empty, get schema from first line.
// 2. header_type is CSV_WITH_NAMES, get schema from first line.
// 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
- Status get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) override;
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) override;
private:
// used for stream/broker load of csv file.
diff --git a/be/src/vec/exec/format/generic_reader.h b/be/src/vec/exec/format/generic_reader.h
index dd2bdd249c..30e93aacd8 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -39,9 +39,9 @@ public:
return Status::NotSupported("get_columns is not implemented");
}
- virtual Status get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) {
- return Status::NotSupported("get_parser_schema is not implemented for this reader.");
+ virtual Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
+ return Status::NotSupported("get_parsed_schema is not implemented for this reader.");
}
virtual ~GenericReader() = default;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp
index 2228f9701d..68a3f089e5 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -139,8 +139,8 @@ Status NewJsonReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-Status NewJsonReader::get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) {
+Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
RETURN_IF_ERROR(_get_range_params());
RETURN_IF_ERROR(_open_file_reader());
diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h
index 6b003c30fe..5a057d32fe 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -48,8 +48,8 @@ public:
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- Status get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) override;
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) override;
private:
Status _get_range_params();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4998bc2486..2748160e97 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -176,8 +176,8 @@ Status OrcReader::init_reader(
return Status::OK();
}
-Status OrcReader::get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) {
+Status OrcReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
if (_file_reader == nullptr) {
std::unique_ptr<FileReader> inner_reader;
RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path,
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h b/be/src/vec/exec/format/orc/vorc_reader.h
index 9b0e4b6a44..98c2fc7c02 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -99,8 +99,8 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- Status get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) override;
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) override;
private:
struct OrcProfile {
diff --git a/be/src/vec/exec/format/parquet/parquet_common.h b/be/src/vec/exec/format/parquet/parquet_common.h
index 611ce969f1..112be6a223 100644
--- a/be/src/vec/exec/format/parquet/parquet_common.h
+++ b/be/src/vec/exec/format/parquet/parquet_common.h
@@ -41,7 +41,7 @@ namespace doris::vectorized {
using level_t = int16_t;
struct RowRange {
- RowRange() {}
+ RowRange() = default;
RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}
int64_t first_row;
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
index c0557c482a..75f43b4730 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.cpp
@@ -19,7 +19,6 @@
#include <common/status.h>
#include <gen_cpp/parquet_types.h>
-#include <vec/columns/columns_number.h>
#include "schema_desc.h"
#include "vec/data_types/data_type_array.h"
@@ -28,7 +27,9 @@
namespace doris::vectorized {
Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
- const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+ const ParquetReadColumn& column,
+ const tparquet::RowGroup& row_group,
+ const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
std::unique_ptr<ParquetColumnReader>& reader,
size_t max_buf_size) {
if (field->type.type == TYPE_MAP || field->type.type == TYPE_STRUCT) {
@@ -36,13 +37,13 @@ Status ParquetColumnReader::create(FileReader* file, FieldSchema* field,
}
if (field->type.type == TYPE_ARRAY) {
tparquet::ColumnChunk chunk = row_group.columns[field->children[0].physical_column_index];
- ArrayColumnReader* array_reader = new ArrayColumnReader(ctz);
+ ArrayColumnReader* array_reader = new ArrayColumnReader(row_ranges, ctz);
array_reader->init_column_metadata(chunk);
RETURN_IF_ERROR(array_reader->init(file, field, &chunk, max_buf_size));
reader.reset(array_reader);
} else {
tparquet::ColumnChunk chunk = row_group.columns[field->physical_column_index];
- ScalarColumnReader* scalar_reader = new ScalarColumnReader(ctz);
+ ScalarColumnReader* scalar_reader = new ScalarColumnReader(row_ranges, ctz);
scalar_reader->init_column_metadata(chunk);
RETURN_IF_ERROR(scalar_reader->init(file, field, &chunk, max_buf_size));
reader.reset(scalar_reader);
@@ -61,23 +62,19 @@ void ParquetColumnReader::init_column_metadata(const tparquet::ColumnChunk& chun
void ParquetColumnReader::_generate_read_ranges(int64_t start_index, int64_t end_index,
std::list<RowRange>& read_ranges) {
- if (_row_ranges->empty()) {
- read_ranges.emplace_back(start_index, end_index);
- return;
- }
int index = _row_range_index;
- while (index < _row_ranges->size()) {
- const RowRange& row_range = (*_row_ranges)[index];
- if (row_range.last_row <= start_index) {
+ while (index < _row_ranges.size()) {
+ const RowRange& read_range = _row_ranges[index];
+ if (read_range.last_row <= start_index) {
index++;
_row_range_index++;
continue;
}
- if (row_range.first_row >= end_index) {
+ if (read_range.first_row >= end_index) {
break;
}
- int64_t start = row_range.first_row < start_index ? start_index : row_range.first_row;
- int64_t end = row_range.last_row < end_index ? row_range.last_row : end_index;
+ int64_t start = read_range.first_row < start_index ? start_index : read_range.first_row;
+ int64_t end = read_range.last_row < end_index ? read_range.last_row : end_index;
read_ranges.emplace_back(start, end);
index++;
}
@@ -216,7 +213,7 @@ Status ScalarColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr
// lazy read
size_t remaining_num_values = 0;
for (auto& range : read_ranges) {
- remaining_num_values = range.last_row - range.first_row;
+ remaining_num_values += range.last_row - range.first_row;
}
if (batch_size >= remaining_num_values &&
select_vector.can_filter_all(remaining_num_values)) {
@@ -330,6 +327,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
data_column = doris_column->assume_mutable();
}
+ // generate array offset
size_t real_batch_size = 0;
size_t num_values = 0;
std::vector<size_t> element_offsets;
@@ -339,7 +337,7 @@ Status ArrayColumnReader::read_column_data(ColumnPtr& doris_column, DataTypePtr&
level_t* definitions = _def_levels_buf.get();
_chunk_reader->get_def_levels(definitions, num_values);
_def_offset = 0;
- // read_range delete_row_range
+
// generate the row ranges that should be read
std::list<RowRange> read_ranges;
_generate_read_ranges(_current_row_index, _current_row_index + real_batch_size, read_ranges);
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
index b7d062e7c6..de0ec185b9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_reader.h
@@ -96,7 +96,8 @@ public:
}
};
- ParquetColumnReader(cctz::time_zone* ctz) : _ctz(ctz) {};
+ ParquetColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+ : _row_ranges(row_ranges), _ctz(ctz) {};
virtual ~ParquetColumnReader() {
if (_stream_reader != nullptr) {
delete _stream_reader;
@@ -106,12 +107,12 @@ public:
virtual Status read_column_data(ColumnPtr& doris_column, DataTypePtr& type,
ColumnSelectVector& select_vector, size_t batch_size,
size_t* read_rows, bool* eof) = 0;
- static Status create(FileReader* file, FieldSchema* field, const tparquet::RowGroup& row_group,
- cctz::time_zone* ctz, std::unique_ptr<ParquetColumnReader>& reader,
- size_t max_buf_size);
+ static Status create(FileReader* file, FieldSchema* field, const ParquetReadColumn& column,
+ const tparquet::RowGroup& row_group,
+ const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz,
+ std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size);
void init_column_metadata(const tparquet::ColumnChunk& chunk);
void add_offset_index(tparquet::OffsetIndex* offset_index) { _offset_index = offset_index; }
- void set_row_ranges(const std::vector<RowRange>* row_ranges) { _row_ranges = row_ranges; };
Statistics statistics() {
return Statistics(_stream_reader->statistics(), _chunk_reader->statistics(),
_decode_null_map_time);
@@ -124,7 +125,7 @@ protected:
BufferedFileStreamReader* _stream_reader;
std::unique_ptr<ParquetColumnMetadata> _metadata;
- const std::vector<RowRange>* _row_ranges;
+ const std::vector<RowRange>& _row_ranges;
cctz::time_zone* _ctz;
std::unique_ptr<ColumnChunkReader> _chunk_reader;
tparquet::OffsetIndex* _offset_index;
@@ -135,7 +136,8 @@ protected:
class ScalarColumnReader : public ParquetColumnReader {
public:
- ScalarColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
+ ScalarColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+ : ParquetColumnReader(row_ranges, ctz) {};
~ScalarColumnReader() override { close(); };
Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
size_t max_buf_size);
@@ -150,7 +152,8 @@ public:
class ArrayColumnReader : public ParquetColumnReader {
public:
- ArrayColumnReader(cctz::time_zone* ctz) : ParquetColumnReader(ctz) {};
+ ArrayColumnReader(const std::vector<RowRange>& row_ranges, cctz::time_zone* ctz)
+ : ParquetColumnReader(row_ranges, ctz) {};
~ArrayColumnReader() override { close(); };
Status init(FileReader* file, FieldSchema* field, tparquet::ColumnChunk* chunk,
size_t max_buf_size);
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 42baf3a283..24441896a9 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -23,26 +23,31 @@
namespace doris::vectorized {
+const std::vector<int64_t> RowGroupReader::NO_DELETE = {};
+
RowGroupReader::RowGroupReader(doris::FileReader* file_reader,
const std::vector<ParquetReadColumn>& read_columns,
- const RowGroupIndex& row_group_idx,
- const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+ const int32_t row_group_id, const tparquet::RowGroup& row_group,
+ cctz::time_zone* ctz,
+ const PositionDeleteContext& position_delete_ctx,
const LazyReadContext& lazy_read_ctx)
: _file_reader(file_reader),
_read_columns(read_columns),
- _row_group_idx(row_group_idx),
+ _row_group_id(row_group_id),
_row_group_meta(row_group),
_remaining_rows(row_group.num_rows),
_ctz(ctz),
+ _position_delete_ctx(position_delete_ctx),
_lazy_read_ctx(lazy_read_ctx) {}
RowGroupReader::~RowGroupReader() {
_column_readers.clear();
}
-Status RowGroupReader::init(const FieldDescriptor& schema,
+Status RowGroupReader::init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets) {
- if (_read_columns.size() == 0) {
+ _merge_read_ranges(row_ranges);
+ if (_read_columns.empty()) {
// Query task that only select columns in path.
return Status::OK();
}
@@ -52,15 +57,15 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
for (auto& read_col : _read_columns) {
auto field = const_cast<FieldSchema*>(schema.get_column(read_col._file_slot_name));
std::unique_ptr<ParquetColumnReader> reader;
- RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, _row_group_meta, _ctz,
- reader, max_buf_size));
+ RETURN_IF_ERROR(ParquetColumnReader::create(_file_reader, field, read_col, _row_group_meta,
+ _read_ranges, _ctz, reader, max_buf_size));
auto col_iter = col_offsets.find(read_col._parquet_col_id);
if (col_iter != col_offsets.end()) {
tparquet::OffsetIndex oi = col_iter->second;
reader->add_offset_index(&oi);
}
if (reader == nullptr) {
- VLOG_DEBUG << "Init row group(" << _row_group_idx.row_group_id << ") reader failed";
+ VLOG_DEBUG << "Init row group(" << _row_group_id << ") reader failed";
return Status::Corruption("Init row group reader failed");
}
_column_readers[read_col._file_slot_name] = std::move(reader);
@@ -68,17 +73,11 @@ Status RowGroupReader::init(const FieldDescriptor& schema,
return Status::OK();
}
-void RowGroupReader::set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges) {
- for (auto& read_col : _read_columns) {
- _column_readers[read_col._file_slot_name]->set_row_ranges(&row_ranges);
- }
-}
-
Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_rows,
- bool* _batch_eof) {
+ bool* batch_eof) {
// Process external table query task that select columns are all from path.
if (_read_columns.empty()) {
- RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, _batch_eof));
+ RETURN_IF_ERROR(_read_empty_batch(batch_size, read_rows, batch_eof));
RETURN_IF_ERROR(
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
@@ -90,11 +89,11 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
}
if (_lazy_read_ctx.can_lazy_read) {
// call _do_lazy_read recursively when current batch is skipped
- return _do_lazy_read(block, batch_size, read_rows, _batch_eof);
+ return _do_lazy_read(block, batch_size, read_rows, batch_eof);
} else {
ColumnSelectVector run_length_vector;
RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.all_read_columns, batch_size,
- read_rows, _batch_eof, run_length_vector));
+ read_rows, batch_eof, run_length_vector));
RETURN_IF_ERROR(
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
@@ -106,8 +105,39 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
}
}
+void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
+ // row_ranges is generated from page index, and the row index begins with 0 in each row group.
+ // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file
+ for (auto& range : row_ranges) {
+ int64_t start_row_id = range.first_row;
+ while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+ const int64_t& delete_row_id =
+ _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
+ _position_delete_ctx.first_row_id;
+ if (delete_row_id < range.first_row) {
+ _position_delete_ctx.index++;
+ } else if (delete_row_id < range.last_row) {
+ if (start_row_id < delete_row_id) {
+ _read_ranges.emplace_back(start_row_id, delete_row_id);
+ }
+ start_row_id = delete_row_id + 1;
+ _position_delete_ctx.index++;
+ } else { // delete_row_id >= range.last_row
+ if (start_row_id < range.last_row) {
+ _read_ranges.emplace_back(start_row_id, range.last_row);
+ start_row_id = range.last_row + 1;
+ }
+ break;
+ }
+ }
+ if (start_row_id < range.last_row) {
+ _read_ranges.emplace_back(start_row_id, range.last_row);
+ }
+ }
+}
+
Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
- size_t batch_size, size_t* read_rows, bool* _batch_eof,
+ size_t batch_size, size_t* read_rows, bool* batch_eof,
ColumnSelectVector& select_vector) {
size_t batch_read_rows = 0;
bool has_eof = false;
@@ -138,57 +168,74 @@ Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::st
col_idx++;
}
*read_rows = batch_read_rows;
- *_batch_eof = has_eof;
+ *batch_eof = has_eof;
return Status::OK();
}
Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof) {
- // read predicate columns
+ std::unique_ptr<ColumnSelectVector> select_vector_ptr = nullptr;
size_t pre_read_rows;
bool pre_eof;
- ColumnSelectVector run_length_vector;
- RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size,
- &pre_read_rows, &pre_eof, run_length_vector));
- RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
- _lazy_read_ctx.predicate_partition_columns));
- RETURN_IF_ERROR(
- _fill_missing_columns(block, pre_read_rows, _lazy_read_ctx.predicate_missing_columns));
- // generate filter vector
- if (_lazy_read_ctx.resize_first_column) {
- // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
- // The following process may be tricky and time-consuming, but we have no other way.
- block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
- }
size_t origin_column_num = block->columns();
int filter_column_id = -1;
- RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id));
- ColumnPtr& sv = block->get_by_position(filter_column_id).column;
- if (_lazy_read_ctx.resize_first_column) {
- // We have to clean the first column to insert right data.
- block->get_by_position(0).column->assume_mutable()->clear();
- }
-
- // build filter map
- bool can_filter_all = false;
- const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all);
- ColumnSelectVector select_vector(filter_map, pre_read_rows, can_filter_all);
- if (select_vector.filter_all() && !pre_eof) {
- // If continuous batches are skipped, we can cache them to skip a whole page
- _cached_filtered_rows += pre_read_rows;
- for (auto& col : _lazy_read_ctx.predicate_columns) {
- // clean block to read predicate columns
- block->get_by_name(col).column->assume_mutable()->clear();
+ while (true) {
+ // read predicate columns
+ pre_read_rows = 0;
+ pre_eof = false;
+ ColumnSelectVector run_length_vector;
+ RETURN_IF_ERROR(_read_column_data(block, _lazy_read_ctx.predicate_columns, batch_size,
+ &pre_read_rows, &pre_eof, run_length_vector));
+ if (pre_read_rows == 0) {
+ DCHECK_EQ(pre_eof, true);
+ break;
}
- for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
- block->get_by_name(col.first).column->assume_mutable()->clear();
+ RETURN_IF_ERROR(_fill_partition_columns(block, pre_read_rows,
+ _lazy_read_ctx.predicate_partition_columns));
+ RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
+ _lazy_read_ctx.predicate_missing_columns));
+ // generate filter vector
+ if (_lazy_read_ctx.resize_first_column) {
+ // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
+ // The following process may be tricky and time-consuming, but we have no other way.
+ block->get_by_position(0).column->assume_mutable()->resize(pre_read_rows);
}
- for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
- block->get_by_name(col.first).column->assume_mutable()->clear();
+ RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &filter_column_id));
+ ColumnPtr& sv = block->get_by_position(filter_column_id).column;
+ if (_lazy_read_ctx.resize_first_column) {
+ // We have to clean the first column to insert right data.
+ block->get_by_position(0).column->assume_mutable()->clear();
}
- Block::erase_useless_column(block, origin_column_num);
- return _do_lazy_read(block, batch_size, read_rows, batch_eof);
+
+ // build filter map
+ bool can_filter_all = false;
+ const uint8_t* filter_map = _build_filter_map(sv, pre_read_rows, &can_filter_all);
+ select_vector_ptr.reset(new ColumnSelectVector(filter_map, pre_read_rows, can_filter_all));
+ if (select_vector_ptr->filter_all() && !pre_eof) {
+ // If continuous batches are skipped, we can cache them to skip a whole page
+ _cached_filtered_rows += pre_read_rows;
+ for (auto& col : _lazy_read_ctx.predicate_columns) {
+ // clean block to read predicate columns
+ block->get_by_name(col).column->assume_mutable()->clear();
+ }
+ for (auto& col : _lazy_read_ctx.predicate_partition_columns) {
+ block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
+ for (auto& col : _lazy_read_ctx.predicate_missing_columns) {
+ block->get_by_name(col.first).column->assume_mutable()->clear();
+ }
+ Block::erase_useless_column(block, origin_column_num);
+ } else {
+ break;
+ }
+ }
+ if (select_vector_ptr == nullptr) {
+ DCHECK_EQ(pre_read_rows + _cached_filtered_rows, 0);
+ *read_rows = 0;
+ *batch_eof = true;
}
+
+ ColumnSelectVector& select_vector = *select_vector_ptr;
std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
if (_cached_filtered_rows != 0) {
_rebuild_select_vector(select_vector, rebuild_filter_map, pre_read_rows);
@@ -345,15 +392,37 @@ Status RowGroupReader::_fill_missing_columns(
return Status::OK();
}
-Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof) {
- if (batch_size < _remaining_rows) {
- *read_rows = batch_size;
- _remaining_rows -= batch_size;
- *_batch_eof = false;
+Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof) {
+ if (_position_delete_ctx.has_filter) {
+ int64_t start_row_id = _position_delete_ctx.current_row_id;
+ int64_t end_row_id = std::min(_position_delete_ctx.current_row_id + (int64_t)batch_size,
+ _position_delete_ctx.last_row_id);
+ int64_t num_delete_rows = 0;
+ while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+ const int64_t& delete_row_id =
+ _position_delete_ctx.delete_rows[_position_delete_ctx.index];
+ if (delete_row_id < start_row_id) {
+ _position_delete_ctx.index++;
+ } else if (delete_row_id < end_row_id) {
+ num_delete_rows++;
+ _position_delete_ctx.index++;
+ } else { // delete_row_id >= end_row_id
+ break;
+ }
+ }
+ *read_rows = end_row_id - start_row_id - num_delete_rows;
+ _position_delete_ctx.current_row_id = end_row_id;
+ *batch_eof = _position_delete_ctx.current_row_id == _position_delete_ctx.last_row_id;
} else {
- *read_rows = _remaining_rows;
- _remaining_rows = 0;
- *_batch_eof = true;
+ if (batch_size < _remaining_rows) {
+ *read_rows = batch_size;
+ _remaining_rows -= batch_size;
+ *batch_eof = false;
+ } else {
+ *read_rows = _remaining_rows;
+ _remaining_rows = 0;
+ *batch_eof = true;
+ }
}
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ef73eba21c..53e8a052c0 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -25,16 +25,18 @@
namespace doris::vectorized {
-struct RowGroupIndex {
- int32_t row_group_id;
- int32_t first_row;
- int32_t last_row;
- RowGroupIndex(int32_t id, int32_t first, int32_t last)
- : row_group_id(id), first_row(first), last_row(last) {}
-};
-
class RowGroupReader {
public:
+ static const std::vector<int64_t> NO_DELETE;
+
+ struct RowGroupIndex {
+ int32_t row_group_id;
+ int64_t first_row;
+ int64_t last_row;
+ RowGroupIndex(int32_t id, int64_t first, int64_t last)
+ : row_group_id(id), first_row(first), last_row(last) {}
+ };
+
struct LazyReadContext {
VExprContext* vconjunct_ctx = nullptr;
bool can_lazy_read = false;
@@ -56,26 +58,66 @@ public:
std::unordered_map<std::string, VExprContext*> missing_columns;
};
+ /**
+ * Support row-level delete in iceberg:
+ * https://iceberg.apache.org/spec/#position-delete-files
+ */
+ struct PositionDeleteContext {
+ // the filtered rows in current row group
+ const std::vector<int64_t>& delete_rows;
+ // the first row id of current row group in parquet file
+ const int64_t first_row_id;
+ // the number of rows in current row group
+ const int64_t num_rows;
+ const int64_t last_row_id;
+ // current row id to read in the row group
+ int64_t current_row_id;
+ // start index in delete_rows
+ const int64_t start_index;
+ // end index in delete_rows
+ const int64_t end_index;
+ // current index in delete_rows
+ int64_t index;
+ const bool has_filter;
+
+ PositionDeleteContext(const std::vector<int64_t>& delete_rows, const int64_t num_rows,
+ const int64_t first_row_id, const int64_t start_index,
+ const int64_t end_index)
+ : delete_rows(delete_rows),
+ first_row_id(first_row_id),
+ num_rows(num_rows),
+ last_row_id(first_row_id + num_rows),
+ current_row_id(first_row_id),
+ start_index(start_index),
+ end_index(end_index),
+ index(start_index),
+ has_filter(end_index > start_index) {}
+
+ PositionDeleteContext(const int64_t num_rows, const int64_t first_row)
+ : PositionDeleteContext(NO_DELETE, num_rows, first_row, 0, 0) {}
+
+ PositionDeleteContext(const PositionDeleteContext& filter) = default;
+ };
+
RowGroupReader(doris::FileReader* file_reader,
- const std::vector<ParquetReadColumn>& read_columns,
- const RowGroupIndex& _row_group_idx, const tparquet::RowGroup& row_group,
- cctz::time_zone* ctz, const LazyReadContext& lazy_read_ctx);
+ const std::vector<ParquetReadColumn>& read_columns, const int32_t row_group_id,
+ const tparquet::RowGroup& row_group, cctz::time_zone* ctz,
+ const PositionDeleteContext& position_delete_ctx,
+ const LazyReadContext& lazy_read_ctx);
~RowGroupReader();
- Status init(const FieldDescriptor& schema,
+ Status init(const FieldDescriptor& schema, std::vector<RowRange>& row_ranges,
std::unordered_map<int, tparquet::OffsetIndex>& col_offsets);
- Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* _batch_eof);
- int64_t lazy_read_filtered_rows() { return _lazy_read_filtered_rows; }
- const RowGroupIndex& index() { return _row_group_idx; }
- void set_row_ranges(const std::vector<doris::vectorized::RowRange>& row_ranges);
+ Status next_batch(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
int64_t lazy_read_filtered_rows() const { return _lazy_read_filtered_rows; }
ParquetColumnReader::Statistics statistics();
private:
- Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* _batch_eof);
+ void _merge_read_ranges(std::vector<RowRange>& row_ranges);
+ Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* batch_eof);
Status _read_column_data(Block* block, const std::vector<std::string>& columns,
- size_t batch_size, size_t* read_rows, bool* _batch_eof,
+ size_t batch_size, size_t* read_rows, bool* batch_eof,
ColumnSelectVector& select_vector);
Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof);
const uint8_t* _build_filter_map(ColumnPtr& sv, size_t num_rows, bool* can_filter_all);
@@ -92,10 +134,13 @@ private:
doris::FileReader* _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
const std::vector<ParquetReadColumn>& _read_columns;
- const RowGroupIndex& _row_group_idx;
+ const int32_t _row_group_id;
const tparquet::RowGroup& _row_group_meta;
int64_t _remaining_rows;
cctz::time_zone* _ctz;
+ PositionDeleteContext _position_delete_ctx;
+ // merge the row ranges generated from page index and position delete.
+ std::vector<RowRange> _read_ranges;
const LazyReadContext& _lazy_read_ctx;
int64_t _lazy_read_filtered_rows = 0;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index c8d4f6b3c4..6613b1f987 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -19,6 +19,7 @@
#include <algorithm>
+#include "common/status.h"
#include "io/file_factory.h"
#include "parquet_pred_cmp.h"
#include "parquet_thrift_util.h"
@@ -150,24 +151,12 @@ Status ParquetReader::_open_file() {
return Status::OK();
}
-Status ParquetReader::file_metadata(FileMetaData** metadata) {
- Status open_status = _open_file();
- if (!open_status.ok()) {
- return open_status;
- }
- *metadata = _file_metadata.get();
- return Status::OK();
-}
-
Status ParquetReader::init_reader(
const std::vector<std::string>& column_names,
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
VExprContext* vconjunct_ctx, bool filter_groups) {
SCOPED_RAW_TIMER(&_statistics.parse_meta_time);
- Status open_status = _open_file();
- if (!open_status.ok()) {
- return open_status;
- }
+ RETURN_IF_ERROR(_open_file());
_column_names = &column_names;
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
@@ -182,7 +171,7 @@ Status ParquetReader::init_reader(
RETURN_IF_ERROR(_init_read_columns());
// build column predicates for column lazy read
_lazy_read_ctx.vconjunct_ctx = vconjunct_ctx;
- RETURN_IF_ERROR(_init_row_group_readers(filter_groups));
+ RETURN_IF_ERROR(_init_row_groups(filter_groups));
return Status::OK();
}
@@ -317,20 +306,9 @@ std::unordered_map<std::string, TypeDescriptor> ParquetReader::get_name_to_type(
return map;
}
-Status ParquetReader::get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) {
- if (_file_reader == nullptr) {
- RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, _scan_params, _scan_range.path,
- _scan_range.start_offset,
- _scan_range.file_size, 0, _file_reader));
- }
- if (_file_metadata == nullptr) {
- RETURN_IF_ERROR(_file_reader->open());
- if (_file_reader->size() == 0) {
- return Status::EndOfFile("Empty Parquet File");
- }
- RETURN_IF_ERROR(parse_thrift_footer(_file_reader.get(), _file_metadata));
- }
+Status ParquetReader::get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) {
+ RETURN_IF_ERROR(_open_file());
_t_metadata = &_file_metadata->to_thrift();
_total_groups = _t_metadata->row_groups.size();
@@ -362,143 +340,90 @@ Status ParquetReader::get_columns(std::unordered_map<std::string, TypeDescriptor
return Status::OK();
}
-void ParquetReader::merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges) {
- if (_row_ranges.empty()) {
- _current_group_reader->set_row_ranges(_row_ranges);
- return;
- }
- if (!delete_row_ranges.empty()) {
- std::vector<RowRange> candidate_ranges;
- auto start_range = _row_ranges.begin();
- auto delete_range = delete_row_ranges.begin();
- int64_t processed_range_start_idx = start_range->first_row;
- while (start_range != _row_ranges.end() && delete_range != delete_row_ranges.end()) {
- int64_t delete_start = delete_range->first_row;
- int64_t delete_end = delete_range->last_row;
- int64_t range_start = start_range->first_row;
- int64_t range_end = start_range->last_row;
- if (delete_end > range_end) {
- if (range_start < processed_range_start_idx) {
- // rows before processed_range_start_idx have been processed
- range_start = processed_range_start_idx;
- }
- if (range_end < delete_start) {
- /**
- * start_range
- * || --------- || - |--------- |
- * delete_range
- */
- candidate_ranges.emplace_back(range_start, range_end);
- } else if (range_start < delete_start) {
- /**
- * row_range
- * || --------|-------- || ----- |
- * delete_start delete_end
- */
- candidate_ranges.emplace_back(range_start, delete_start);
- }
- // range_end > delete_end && range_start > delete_start
- start_range++;
- } else {
- // delete_end < range_end,most of the time, we will use this branch
- if (processed_range_start_idx < delete_start) {
- /**
- * row_range_start row_range_end
- * || --- | --------- | --- ||
- * delete_range
- */
- candidate_ranges.emplace_back(processed_range_start_idx, delete_start);
- }
- // delete_end is in row_range, so it can assign to processed_range_start_idx
- processed_range_start_idx = delete_end;
- delete_range++;
- if (delete_range == delete_row_ranges.end()) {
- range_end = _row_ranges[_row_ranges.size() - 1].last_row;
- if (processed_range_start_idx != range_end) {
- candidate_ranges.emplace_back(processed_range_start_idx, range_end);
- }
- }
- }
- }
- _row_ranges.assign(candidate_ranges.begin(), candidate_ranges.end());
- }
- _current_group_reader->set_row_ranges(_row_ranges);
-}
-
Status ParquetReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
- int32_t num_of_readers = _row_group_readers.size();
- DCHECK(num_of_readers <= _read_row_groups.size());
- if (_read_row_groups.empty()) {
- *eof = true;
- return Status::OK();
+ if (_current_group_reader == nullptr) {
+ if (_read_row_groups.size() > 0) {
+ RETURN_IF_ERROR(_next_row_group_reader());
+ } else {
+ *read_rows = 0;
+ *eof = true;
+ return Status::OK();
+ }
}
- bool _batch_eof = false;
+ DCHECK(_current_group_reader != nullptr);
+ bool batch_eof = false;
{
SCOPED_RAW_TIMER(&_statistics.column_read_time);
RETURN_IF_ERROR(
- _current_group_reader->next_batch(block, _batch_size, read_rows, &_batch_eof));
+ _current_group_reader->next_batch(block, _batch_size, read_rows, &batch_eof));
}
- if (_batch_eof) {
+ if (batch_eof) {
auto column_st = _current_group_reader->statistics();
_column_statistics.merge(column_st);
_statistics.lazy_read_filtered_rows += _current_group_reader->lazy_read_filtered_rows();
- if (!_next_row_group_reader()) {
+ Status st = _next_row_group_reader();
+ if (st.is_end_of_file()) {
*eof = true;
+ } else if (!st.ok()) {
+ return st;
}
}
- VLOG_DEBUG << "ParquetReader::get_next_block: " << block->rows();
return Status::OK();
}
-bool ParquetReader::_next_row_group_reader() {
- if (_row_group_readers.empty()) {
- return false;
- }
- _current_group_reader = _row_group_readers.front();
- _row_group_readers.pop_front();
- return true;
+RowGroupReader::PositionDeleteContext ParquetReader::_get_position_delete_ctx(
+ const tparquet::RowGroup& row_group, const RowGroupReader::RowGroupIndex& row_group_index) {
+ if (_delete_rows == nullptr) {
+ return RowGroupReader::PositionDeleteContext(row_group.num_rows, row_group_index.first_row);
+ }
+ int64_t* delete_rows = const_cast<int64_t*>(&(*_delete_rows)[0]);
+ int64_t* delete_rows_end = delete_rows + _delete_rows->size();
+ int64_t* start_pos = std::lower_bound(delete_rows + _delete_rows_index, delete_rows_end,
+ row_group_index.first_row);
+ int64_t start_index = start_pos - delete_rows;
+ int64_t* end_pos = std::lower_bound(start_pos, delete_rows_end, row_group_index.last_row);
+ int64_t end_index = end_pos - delete_rows;
+ _delete_rows_index = end_index;
+ return RowGroupReader::PositionDeleteContext(*_delete_rows, row_group.num_rows,
+ row_group_index.first_row, start_index, end_index);
}
-Status ParquetReader::_init_row_group_readers(const bool& filter_groups) {
- std::vector<RowGroupIndex> group_indexes;
- RETURN_IF_ERROR(_filter_row_groups(filter_groups, group_indexes));
- DCHECK_EQ(group_indexes.size(), _read_row_groups.size());
- auto group_index = group_indexes.begin();
- for (auto row_group_id : _read_row_groups) {
- auto& row_group = _t_metadata->row_groups[row_group_id];
- std::shared_ptr<RowGroupReader> row_group_reader;
- row_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns, *group_index,
- row_group, _ctz, _lazy_read_ctx));
- group_index++;
- RETURN_IF_ERROR(_process_page_index(row_group));
- if (_row_ranges.empty()) {
- _row_ranges.emplace_back(0, row_group.num_rows);
- _statistics.read_rows += row_group.num_rows;
- }
- RETURN_IF_ERROR(row_group_reader->init(_file_metadata->schema(), _col_offsets));
- row_group_reader->set_row_ranges(_row_ranges);
- _row_group_readers.emplace_back(row_group_reader);
- }
- if (!_next_row_group_reader()) {
- return Status::EndOfFile("No next reader");
- }
- return Status::OK();
+Status ParquetReader::_next_row_group_reader() {
+ if (_read_row_groups.empty()) {
+ _current_group_reader.reset(nullptr);
+ return Status::EndOfFile("No next RowGroupReader");
+ }
+ RowGroupReader::RowGroupIndex row_group_index = _read_row_groups.front();
+ _read_row_groups.pop_front();
+
+ // process page index and generate the ranges to read
+ auto& row_group = _t_metadata->row_groups[row_group_index.row_group_id];
+ std::vector<RowRange> candidate_row_ranges;
+ RETURN_IF_ERROR(_process_page_index(row_group, candidate_row_ranges));
+
+ RowGroupReader::PositionDeleteContext position_delete_ctx =
+ _get_position_delete_ctx(row_group, row_group_index);
+ _current_group_reader.reset(new RowGroupReader(_file_reader.get(), _read_columns,
+ row_group_index.row_group_id, row_group, _ctz,
+ position_delete_ctx, _lazy_read_ctx));
+ return _current_group_reader->init(_file_metadata->schema(), candidate_row_ranges,
+ _col_offsets);
}
-Status ParquetReader::_filter_row_groups(const bool& enabled,
- std::vector<RowGroupIndex>& group_indexes) {
+Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
SCOPED_RAW_TIMER(&_statistics.row_group_filter_time);
- if (enabled && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
- return Status::EndOfFile("No row group need read");
+ if (is_filter_groups && (_total_groups == 0 || _t_metadata->num_rows == 0 || _range_size < 0)) {
+ return Status::EndOfFile("No row group to read");
}
- int32_t start_row_id = 0;
+ int64_t row_index = 0;
for (int32_t row_group_idx = 0; row_group_idx < _total_groups; row_group_idx++) {
const tparquet::RowGroup& row_group = _t_metadata->row_groups[row_group_idx];
- if (enabled && _is_misaligned_range_group(row_group)) {
+ if (is_filter_groups && _is_misaligned_range_group(row_group)) {
+ row_index += row_group.num_rows;
continue;
}
bool filter_group = false;
- if (enabled) {
+ if (is_filter_groups) {
RETURN_IF_ERROR(_process_row_group_filter(row_group, &filter_group));
}
int64_t group_size = 0; // only calculate the needed columns
@@ -508,13 +433,12 @@ Status ParquetReader::_filter_row_groups(const bool& enabled,
group_size += row_group.columns[parquet_col_id].meta_data.total_compressed_size;
}
}
- // record row group physical id
- int32_t first_row_index = start_row_id;
- int32_t last_row_index = first_row_index + row_group.num_rows;
- start_row_id = last_row_index + 1;
if (!filter_group) {
- group_indexes.emplace_back(row_group_idx, first_row_index, last_row_index);
- _read_row_groups.emplace_back(row_group_idx);
+ _read_row_groups.emplace_back(row_group_idx, row_index, row_index + row_group.num_rows);
+ if (_statistics.read_row_groups == 0) {
+ _whole_range.first_row = row_index;
+ }
+ _whole_range.last_row = row_index + row_group.num_rows;
_statistics.read_row_groups++;
_statistics.read_bytes += group_size;
} else {
@@ -522,6 +446,11 @@ Status ParquetReader::_filter_row_groups(const bool& enabled,
_statistics.filtered_bytes += group_size;
_statistics.filtered_group_rows += row_group.num_rows;
}
+ row_index += row_group.num_rows;
+ }
+
+ if (_read_row_groups.empty()) {
+ return Status::EndOfFile("No row group to read");
}
return Status::OK();
}
@@ -545,13 +474,22 @@ bool ParquetReader::_has_page_index(const std::vector<tparquet::ColumnChunk>& co
return page_index.check_and_get_page_index_ranges(columns);
}
-Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
+Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group,
+ std::vector<RowRange>& candidate_row_ranges) {
SCOPED_RAW_TIMER(&_statistics.page_index_filter_time);
+
+ std::function<void()> read_whole_row_group = [&]() {
+ candidate_row_ranges.emplace_back(0, row_group.num_rows);
+ _statistics.read_rows += row_group.num_rows;
+ };
+
if (_colname_to_value_range == nullptr || _colname_to_value_range->empty()) {
+ read_whole_row_group();
return Status::OK();
}
PageIndex page_index;
if (!_has_page_index(row_group.columns, page_index)) {
+ read_whole_row_group();
return Status::OK();
}
uint8_t col_index_buff[page_index._column_index_size];
@@ -573,12 +511,12 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
auto& chunk = row_group.columns[read_col._parquet_col_id];
tparquet::ColumnIndex column_index;
if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
- return Status::OK();
+ continue;
}
RETURN_IF_ERROR(page_index.parse_column_index(chunk, col_index_buff, &column_index));
const int num_of_pages = column_index.null_pages.size();
if (num_of_pages <= 0) {
- break;
+ continue;
}
auto& conjuncts = conjunct_iter->second;
std::vector<int> skipped_page_range;
@@ -600,6 +538,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
_col_offsets.emplace(read_col._parquet_col_id, offset_index);
}
if (skipped_row_ranges.empty()) {
+ read_whole_row_group();
return Status::OK();
}
@@ -617,14 +556,14 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
}
} else {
// read row with candidate ranges rather than skipped ranges
- _row_ranges.emplace_back(skip_end, skip_range.first_row);
+ candidate_row_ranges.emplace_back(skip_end, skip_range.first_row);
read_rows += skip_range.first_row - skip_end;
skip_end = skip_range.last_row;
}
}
DCHECK_LE(skip_end, row_group.num_rows);
if (skip_end != row_group.num_rows) {
- _row_ranges.emplace_back(skip_end, row_group.num_rows);
+ candidate_row_ranges.emplace_back(skip_end, row_group.num_rows);
read_rows += row_group.num_rows - skip_end;
}
_statistics.read_rows += read_rows;
@@ -676,13 +615,13 @@ Status ParquetReader::_process_column_stat_filter(const std::vector<tparquet::Co
void ParquetReader::_init_chunk_dicts() {}
Status ParquetReader::_process_dict_filter(bool* filter_group) {
- return Status();
+ return Status::OK();
}
void ParquetReader::_init_bloom_filter() {}
Status ParquetReader::_process_bloom_filter(bool* filter_group) {
- return Status();
+ return Status::OK();
}
int64_t ParquetReader::_get_column_start_offset(const tparquet::ColumnMetaData& column) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h b/be/src/vec/exec/format/parquet/vparquet_reader.h
index f5891754e7..2bfc74f823 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -77,9 +77,10 @@ public:
void close();
- Status file_metadata(FileMetaData** metadata);
+ RowRange get_whole_range() { return _whole_range; }
- void merge_delete_row_ranges(const std::set<RowRange>& delete_row_ranges);
+ // set the delete rows in current parquet file
+ void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; }
int64_t size() const { return _file_reader->size(); }
@@ -87,8 +88,8 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
- Status get_parsered_schema(std::vector<std::string>* col_names,
- std::vector<TypeDescriptor>* col_types) override;
+ Status get_parsed_schema(std::vector<std::string>* col_names,
+ std::vector<TypeDescriptor>* col_types) override;
Statistics& statistics() { return _statistics; }
@@ -125,12 +126,16 @@ private:
Status _open_file();
void _init_profile();
- bool _next_row_group_reader();
+ Status _next_row_group_reader();
+ RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
+ const tparquet::RowGroup& row_group,
+ const RowGroupReader::RowGroupIndex& row_group_index);
Status _init_read_columns();
- Status _init_row_group_readers(const bool& filter_groups);
+ Status _init_row_groups(const bool& is_filter_groups);
// Page Index Filter
bool _has_page_index(const std::vector<tparquet::ColumnChunk>& columns, PageIndex& page_index);
- Status _process_page_index(const tparquet::RowGroup& row_group);
+ Status _process_page_index(const tparquet::RowGroup& row_group,
+ std::vector<RowRange>& candidate_row_ranges);
// Row Group Filter
bool _is_misaligned_range_group(const tparquet::RowGroup& row_group);
@@ -141,28 +146,27 @@ private:
Status _process_dict_filter(bool* filter_group);
void _init_bloom_filter();
Status _process_bloom_filter(bool* filter_group);
- Status _filter_row_groups(const bool& enabled, std::vector<RowGroupIndex>& group_indexes);
int64_t _get_column_start_offset(const tparquet::ColumnMetaData& column_init_column_readers);
-private:
RuntimeProfile* _profile;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
std::unique_ptr<FileReader> _file_reader = nullptr;
- std::vector<RowRange> _row_ranges;
std::shared_ptr<FileMetaData> _file_metadata;
const tparquet::FileMetaData* _t_metadata;
- std::list<std::shared_ptr<RowGroupReader>> _row_group_readers;
- std::shared_ptr<RowGroupReader> _current_group_reader;
+ std::unique_ptr<RowGroupReader> _current_group_reader;
int32_t _total_groups; // num of groups(stripes) of a parquet(orc) file
std::map<std::string, int> _map_column; // column-name <---> column-index
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::vector<ParquetReadColumn> _read_columns;
+ RowRange _whole_range = RowRange(0, 0);
+ const std::vector<int64_t>* _delete_rows = nullptr;
+ int64_t _delete_rows_index = 0;
// Used for column lazy read.
RowGroupReader::LazyReadContext _lazy_read_ctx;
- std::list<int32_t> _read_row_groups;
+ std::list<RowGroupReader::RowGroupIndex> _read_row_groups;
// parquet file reader object
size_t _batch_size;
int64_t _range_start_offset;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 9e2f659772..332aa44e54 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -17,12 +17,10 @@
#include "iceberg_reader.h"
-#include <vec/core/column_with_type_and_name.h>
-#include <vec/exec/format/parquet/vparquet_reader.h>
-
-#include <vec/data_types/data_type_factory.hpp>
-
#include "vec/common/assert_cast.h"
+#include "vec/core/column_with_type_and_name.h"
+#include "vec/data_types/data_type_factory.hpp"
+#include "vec/exec/format/parquet/vparquet_reader.h"
namespace doris::vectorized {
@@ -34,9 +32,11 @@ IcebergTableReader::IcebergTableReader(GenericReader* file_format_reader, Runtim
: TableFormatReader(file_format_reader), _profile(profile), _state(state), _params(params) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
- _iceberg_profile._delete_files_init_time =
- ADD_CHILD_TIMER(_profile, "DeleteFileInitTime", iceberg_profile);
- _iceberg_profile._delete_files_read_total_time =
+ _iceberg_profile.num_delete_files =
+ ADD_CHILD_COUNTER(_profile, "NumDeleteFiles", TUnit::UNIT, iceberg_profile);
+ _iceberg_profile.num_delete_rows =
+ ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT, iceberg_profile);
+ _iceberg_profile.delete_files_read_time =
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
}
@@ -63,162 +63,159 @@ Status IcebergTableReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
-void IcebergTableReader::filter_rows(const TFileRangeDesc& range) {
- if (_cur_delete_file_reader == nullptr) {
- return;
- }
- SCOPED_TIMER(_iceberg_profile._delete_files_read_total_time);
+Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
auto& table_desc = range.table_format_params.iceberg_params;
auto& version = table_desc.format_version;
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
- return;
+ return Status::OK();
}
- bool eof = false;
- std::set<RowRange> delete_row_ranges;
- while (!eof) {
- size_t read_rows = 0;
- Block block = Block();
- for (const FieldSchema& field : _column_schemas) {
- DataTypePtr data_type = DataTypeFactory::instance().create_data_type(field.type, true);
- MutableColumnPtr data_column = data_type->create_column();
- block.insert(ColumnWithTypeAndName(std::move(data_column), data_type, field.name));
- }
- Status st = _cur_delete_file_reader->get_next_block(&block, &read_rows, &eof);
- if (!st.ok() || eof) {
- if (!_delete_file_readers.empty()) {
- eof = false;
- _cur_delete_file_reader = std::move(_delete_file_readers.front());
- _delete_file_readers.pop_front();
- }
- }
- if (read_rows != 0) {
- auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
- ColumnPtr pos_column = pos_type_column.column;
- using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
- if (pos_type_column.type->is_nullable()) {
- pos_column =
- assert_cast<const ColumnNullable&>(*pos_column).get_nested_column_ptr();
- }
- auto& data = assert_cast<const ColumnType&>(*pos_column).get_data();
- std::vector<int64_t> delete_row_ids;
- for (int row_id = 0; row_id < read_rows; row_id++) {
- delete_row_ids.emplace_back(data[row_id]);
+ auto& delete_file_type = table_desc.content;
+ auto files = table_desc.delete_files;
+ if (files.empty()) {
+ return Status::OK();
+ }
+ if (delete_file_type == POSITION_DELETE) {
+ // position delete
+ SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
+ auto row_desc = RowDescriptor(_state->desc_tbl(),
+ std::vector<TupleId>({table_desc.delete_table_tuple_id}),
+ std::vector<bool>({false}));
+ RETURN_IF_ERROR(VExpr::create_expr_tree(_state->obj_pool(), table_desc.file_select_conjunct,
+ &_data_path_conjunct_ctx));
+ RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
+ RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
+
+ ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
+ RowRange whole_range = parquet_reader->get_whole_range();
+ bool init_schema = false;
+ std::vector<std::string> delete_file_col_names;
+ std::vector<TypeDescriptor> delete_file_col_types;
+ std::list<std::vector<int64_t>> delete_rows_list;
+ delete_rows_list.resize(files.size());
+ int64_t num_delete_rows = 0;
+ auto delete_rows_iter = delete_rows_list.begin();
+ for (auto& delete_file : files) {
+ if (whole_range.last_row <= delete_file.position_lower_bound ||
+ whole_range.first_row > delete_file.position_upper_bound) {
+ delete_rows_iter++;
+ continue;
}
- if (delete_row_ids.empty()) {
- return;
+ std::vector<int64_t>& delete_rows = *delete_rows_iter;
+ TFileRangeDesc delete_range;
+ delete_range.path = delete_file.path;
+ delete_range.start_offset = 0;
+ delete_range.size = -1;
+ delete_range.file_size = -1;
+ ParquetReader delete_reader(_profile, _params, delete_range, 102400,
+ const_cast<cctz::time_zone*>(&_state->timezone_obj()));
+ if (!init_schema) {
+ delete_reader.get_parsed_schema(&delete_file_col_names, &delete_file_col_types);
+ init_schema = true;
}
-
- int num_deleted_ids = delete_row_ids.size();
- int i = 0;
- while (i < num_deleted_ids) {
- int64_t row_id = delete_row_ids[i];
- int64_t row_range_start = row_id;
- int64_t row_range_end = row_id;
- while (i + 1 < num_deleted_ids) {
- if (delete_row_ids[i + 1] == delete_row_ids[i] + 1) {
- row_range_end = delete_row_ids[i + 1];
- i++;
- continue;
- } else {
- delete_row_ranges.emplace(row_range_start, row_range_end + 1);
- row_range_start = ++row_range_end;
- break;
- }
+ RETURN_IF_ERROR(delete_reader.init_reader(delete_file_col_names, nullptr,
+ _data_path_conjunct_ctx, false));
+ std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContext*> missing_columns;
+ delete_reader.set_fill_columns(partition_columns, missing_columns);
+
+ bool eof = false;
+ while (!eof) {
+ Block block = Block();
+ for (int i = 0; i < delete_file_col_names.size(); ++i) {
+ DataTypePtr data_type = DataTypeFactory::instance().create_data_type(
+ delete_file_col_types[i], true);
+ MutableColumnPtr data_column = data_type->create_column();
+ block.insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+ delete_file_col_names[i]));
}
- if (i == num_deleted_ids - 1) {
- delete_row_ranges.emplace(row_range_start,
- delete_row_ids[num_deleted_ids - 1] + 1);
+ eof = false;
+ size_t read_rows = 0;
+ RETURN_IF_ERROR(delete_reader.get_next_block(&block, &read_rows, &eof));
+ if (read_rows > 0) {
+ auto& pos_type_column = block.get_by_name(ICEBERG_ROW_POS);
+ ColumnPtr pos_column = pos_type_column.column;
+ using ColumnType = typename PrimitiveTypeTraits<TYPE_BIGINT>::ColumnType;
+ if (pos_type_column.type->is_nullable()) {
+ pos_column = assert_cast<const ColumnNullable&>(*pos_column)
+ .get_nested_column_ptr();
+ }
+ const int64_t* src_data =
+ assert_cast<const ColumnType&>(*pos_column).get_data().data();
+ const int64_t* src_data_end = src_data + read_rows;
+ const int64_t* cpy_start =
+ std::lower_bound(src_data, src_data_end, whole_range.first_row);
+ const int64_t* cpy_end =
+ std::lower_bound(cpy_start, src_data_end, whole_range.last_row);
+ int64_t cpy_count = cpy_end - cpy_start;
+
+ if (cpy_count > 0) {
+ int64_t origin_size = delete_rows.size();
+ delete_rows.resize(origin_size + cpy_count);
+ int64_t* dest_position = &delete_rows[origin_size];
+ memcpy(dest_position, cpy_start, cpy_count * sizeof(int64_t));
+ num_delete_rows += cpy_count;
+ }
}
- row_range_start = delete_row_ids[i + 1];
- i++;
}
+ delete_rows_iter++;
}
- }
- if (VLOG_IS_ON(3)) {
- if (!delete_row_ranges.empty()) {
- std::stringstream out;
- out << "[";
- for (const RowRange& delete_row_range : delete_row_ranges) {
- out << " " << delete_row_range.debug_string();
+ if (num_delete_rows > 0) {
+ for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end();) {
+ if (iter->empty()) {
+ delete_rows_list.erase(iter++);
+ } else {
+ iter++;
+ }
}
- out << " ]";
- VLOG_NOTICE << "Delete row range info: " << out.str();
+ _merge_sort(delete_rows_list, num_delete_rows);
+ parquet_reader->set_delete_rows(&_delete_rows);
+ COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
}
}
- ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
- parquet_reader->merge_delete_row_ranges(delete_row_ranges);
+ // todo: equality delete
+ COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+ return Status::OK();
}
-Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range) {
- auto& table_desc = range.table_format_params.iceberg_params;
- auto& version = table_desc.format_version;
- if (version >= MIN_SUPPORT_DELETE_FILES_VERSION) {
- SCOPED_TIMER(_iceberg_profile._delete_files_init_time);
- auto& delete_file_type = table_desc.content;
- auto files = table_desc.delete_files;
- if (delete_file_type == POSITON_DELELE) {
- // position delete
- auto row_desc = RowDescriptor(_state->desc_tbl(),
- std::vector<TupleId>({table_desc.delete_table_tuple_id}),
- std::vector<bool>({false}));
- RETURN_IF_ERROR(VExpr::create_expr_tree(
- _state->obj_pool(), table_desc.file_select_conjunct, &_data_path_conjunct_ctx));
- RETURN_IF_ERROR(_data_path_conjunct_ctx->prepare(_state, row_desc));
- RETURN_IF_ERROR(_data_path_conjunct_ctx->open(_state));
- vector<std::string> names;
- for (auto& delete_file : files) {
- _position_delete_params.low_bound_index = delete_file.position_lower_bound;
- _position_delete_params.upper_bound_index = delete_file.position_upper_bound;
-
- TFileRangeDesc delete_range;
- delete_range.path = delete_file.path;
- delete_range.start_offset = 0;
- delete_range.size = -1;
- delete_range.file_size = -1;
- ParquetReader* delete_reader = new ParquetReader(
- _profile, _params, delete_range, _state->query_options().batch_size,
- const_cast<cctz::time_zone*>(&_state->timezone_obj()));
- if (_delete_file_schema == nullptr) {
- FileMetaData* metadata = nullptr;
- RETURN_IF_ERROR(delete_reader->file_metadata(&metadata));
- if (metadata == nullptr) {
- break;
- }
- _delete_file_schema = &metadata->schema();
- int num_of_col = _delete_file_schema->size();
- for (auto i = 0; i < num_of_col; ++i) {
- const FieldSchema* field = _delete_file_schema->get_column(i);
- _column_schemas.emplace_back(*field);
- names.emplace_back(field->name);
- }
- }
- DCHECK_EQ(_column_schemas.size(), _delete_file_schema->size());
- // The expr assure reading delete file data from current file range only
- Status d_st =
- delete_reader->init_reader(names, nullptr, _data_path_conjunct_ctx, false);
- std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>
- partition_columns;
- std::unordered_map<std::string, VExprContext*> missing_columns;
- delete_reader->set_fill_columns(partition_columns, missing_columns);
-
- _delete_file_readers.emplace_back((GenericReader*)delete_reader);
-
- ParquetReader* parquet_reader = (ParquetReader*)(_file_format_reader.get());
- FileMetaData* file_metadata = nullptr;
- RETURN_IF_ERROR(parquet_reader->file_metadata(&file_metadata));
- _position_delete_params.total_file_rows = file_metadata->to_thrift().num_rows;
- }
- if (!_delete_file_readers.empty()) {
- _cur_delete_file_reader = std::move(_delete_file_readers.front());
- _delete_file_readers.pop_front();
- } else {
- _cur_delete_file_reader = nullptr;
- }
+void IcebergTableReader::_merge_sort(std::list<std::vector<int64_t>>& delete_rows_list,
+ int64_t num_delete_rows) {
+ if (delete_rows_list.empty()) {
+ return;
+ }
+ if (delete_rows_list.size() == 1) {
+ _delete_rows.resize(num_delete_rows);
+ memcpy(&_delete_rows[0], &(delete_rows_list.front()[0]), sizeof(int64_t) * num_delete_rows);
+ return;
+ }
+ if (delete_rows_list.size() == 2) {
+ _delete_rows.resize(num_delete_rows);
+ std::merge(delete_rows_list.front().begin(), delete_rows_list.front().end(),
+ delete_rows_list.back().begin(), delete_rows_list.back().end(),
+ _delete_rows.begin());
+ return;
+ }
+
+ // merge sort
+ using vec_pair =
+ std::pair<std::vector<int64_t>::iterator, std::vector<int64_t>::const_iterator>;
+ auto cmp = [](const vec_pair& a, const vec_pair& b) { return *a.first > *b.first; };
+ std::priority_queue<vec_pair, vector<vec_pair>, decltype(cmp)> pq(cmp);
+ for (auto iter = delete_rows_list.begin(); iter != delete_rows_list.end(); ++iter) {
+ if (iter->size() > 0) {
+ pq.push({iter->begin(), iter->end()});
+ }
+ }
+ _delete_rows.reserve(num_delete_rows);
+ while (!pq.empty()) {
+ vec_pair p = pq.top();
+ pq.pop();
+ _delete_rows.emplace_back(*p.first);
+ p.first++;
+ if (p.first != p.second) {
+ pq.push(p);
}
}
- // todo: equality delete
- filter_rows(range);
- return Status::OK();
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h b/be/src/vec/exec/format/table/iceberg_reader.h
index c71ffdaa9c..0a9d4ef71f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -17,12 +17,11 @@
#pragma once
-#include <vec/exec/format/parquet/parquet_common.h>
-
#include <queue>
#include "table_format_reader.h"
#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/parquet/parquet_common.h"
#include "vec/exprs/vexpr.h"
namespace doris::vectorized {
@@ -32,8 +31,8 @@ public:
IcebergTableReader(GenericReader* file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params);
~IcebergTableReader() override;
- Status init_row_filters(const TFileRangeDesc& range);
- void filter_rows(const TFileRangeDesc& range) override;
+
+ Status init_row_filters(const TFileRangeDesc& range) override;
Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
@@ -45,30 +44,23 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) override;
-public:
- enum { DATA, POSITON_DELELE, EQULITY_DELELE };
- struct PositionDeleteParams {
- int64_t low_bound_index = -1;
- int64_t upper_bound_index = -1;
- int64_t last_delete_row_index = -1;
- int64_t total_file_rows = 0;
- };
+ enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
private:
struct IcebergProfile {
- RuntimeProfile::Counter* _delete_files_init_time;
- RuntimeProfile::Counter* _delete_files_read_total_time;
+ RuntimeProfile::Counter* num_delete_files;
+ RuntimeProfile::Counter* num_delete_rows;
+ RuntimeProfile::Counter* delete_files_read_time;
};
+
+ void _merge_sort(std::list<std::vector<int64_t>>& delete_rows_list, int64_t num_delete_rows);
+
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
- std::vector<FieldSchema> _column_schemas;
- std::deque<std::unique_ptr<GenericReader>> _delete_file_readers;
- std::unique_ptr<GenericReader> _cur_delete_file_reader;
- PositionDeleteParams _position_delete_params;
- const FieldDescriptor* _delete_file_schema = nullptr;
VExprContext* _data_path_conjunct_ctx = nullptr;
IcebergProfile _iceberg_profile;
+ std::vector<int64_t> _delete_rows;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/table_format_reader.h b/be/src/vec/exec/format/table/table_format_reader.h
index 188d41a2b9..1c2ebec7d3 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -17,12 +17,11 @@
#pragma once
-#include <vec/exec/format/parquet/parquet_common.h>
-
#include <string>
#include "runtime/runtime_state.h"
#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/parquet/parquet_common.h"
namespace doris::vectorized {
@@ -38,7 +37,7 @@ public:
return _file_format_reader->get_columns(name_to_type, missing_cols);
}
- virtual void filter_rows(const TFileRangeDesc& range) = 0;
+ virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
protected:
std::string _table_format; // hudi, iceberg
diff --git a/be/test/vec/exec/parquet/parquet_thrift_test.cpp b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
index c8eeb5d00b..d1f59e9234 100644
--- a/be/test/vec/exec/parquet/parquet_thrift_test.cpp
+++ b/be/test/vec/exec/parquet/parquet_thrift_test.cpp
@@ -413,13 +413,13 @@ TEST_F(ParquetThriftReaderTest, group_reader) {
TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, ctz);
auto row_group = t_metadata.row_groups[0];
std::shared_ptr<RowGroupReader> row_group_reader;
- row_group_reader.reset(new RowGroupReader(&file_reader, read_columns,
- RowGroupIndex(0, 0, 10000), row_group, &ctz,
- lazy_read_ctx));
+ RowGroupReader::PositionDeleteContext position_delete_ctx(row_group.num_rows, 0);
+ row_group_reader.reset(new RowGroupReader(&file_reader, read_columns, 0, row_group, &ctz,
+ position_delete_ctx, lazy_read_ctx));
+ std::vector<RowRange> row_ranges;
+ row_ranges.emplace_back(0, row_group.num_rows);
auto col_offsets = std::unordered_map<int, tparquet::OffsetIndex>();
- auto stg = row_group_reader->init(meta_data->schema(), col_offsets);
- std::vector<RowRange> row_ranges = std::vector<RowRange>();
- row_group_reader->set_row_ranges(row_ranges);
+ auto stg = row_group_reader->init(meta_data->schema(), row_ranges, col_offsets);
EXPECT_TRUE(stg.ok());
vectorized::Block block;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org