You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/27 16:04:34 UTC
[doris] branch master updated: [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa14b7ea9c [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
fa14b7ea9c is described below
commit fa14b7ea9c23bb56293c564ee3a5c0a0368b868a
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Sat Jan 28 00:04:27 2023 +0800
[Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
close #16023
---
.../exec/format/parquet/vparquet_group_reader.cpp | 197 +++++++++++++++++----
.../exec/format/parquet/vparquet_group_reader.h | 8 +
2 files changed, 168 insertions(+), 37 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0c4ff2802c..f4818783f6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -98,42 +98,35 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
_fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
- Status st =
- VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns());
+ if (block->rows() == 0) {
+ *read_rows = block->rows();
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
+
+ std::vector<uint32_t> columns_to_filter;
+ int column_to_keep = block->columns();
+ columns_to_filter.resize(column_to_keep);
+ for (uint32_t i = 0; i < column_to_keep; ++i) {
+ columns_to_filter[i] = i;
+ }
+ if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
+ int result_column_id = -1;
+ RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &result_column_id));
+ ColumnPtr filter_column = block->get_by_position(result_column_id).column;
+ RETURN_IF_ERROR(_filter_block(block, filter_column, column_to_keep, columns_to_filter));
+ } else {
+ RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter));
+ }
+
*read_rows = block->rows();
- return st;
+ return Status::OK();
}
}
void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
- // row_ranges is generated from page index, and the row index begins with 0 in each row group.
- // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file
- for (auto& range : row_ranges) {
- int64_t start_row_id = range.first_row;
- while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
- const int64_t& delete_row_id =
- _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
- _position_delete_ctx.first_row_id;
- if (delete_row_id < range.first_row) {
- _position_delete_ctx.index++;
- } else if (delete_row_id < range.last_row) {
- if (start_row_id < delete_row_id) {
- _read_ranges.emplace_back(start_row_id, delete_row_id);
- }
- start_row_id = delete_row_id + 1;
- _position_delete_ctx.index++;
- } else { // delete_row_id >= range.last_row
- if (start_row_id < range.last_row) {
- _read_ranges.emplace_back(start_row_id, range.last_row);
- start_row_id = range.last_row + 1;
- }
- break;
- }
- }
- if (start_row_id < range.last_row) {
- _read_ranges.emplace_back(start_row_id, range.last_row);
- }
- }
+ _read_ranges = row_ranges;
}
Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
@@ -194,6 +187,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
_lazy_read_ctx.predicate_partition_columns));
RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
_lazy_read_ctx.predicate_missing_columns));
+
+ RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
+
// generate filter vector
if (_lazy_read_ctx.resize_first_column) {
// VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
@@ -262,8 +258,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
// generated from next batch, so the filter column is removed ahead.
DCHECK_EQ(block->rows(), 0);
} else {
- Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
- origin_column_num);
+ ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
+ RETURN_IF_ERROR(_filter_block(block, filter_column, origin_column_num,
+ _lazy_read_ctx.all_predicate_col_ids));
}
} else {
Block::erase_useless_column(block, origin_column_num);
@@ -302,16 +299,23 @@ const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows,
nullable_column->get_nested_column_ptr()->assume_mutable().get());
uint8_t* filter_data = concrete_column->get_data().data();
for (int i = 0; i < num_rows; ++i) {
- // filter null if filter_column if nullable
- filter_data[i] &= !null_map_column[i];
+ (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i];
}
- filter_map = filter_data;
+ filter_map = _filter_ptr->data();
}
} else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
// filter all
*can_filter_all = !const_column->get_bool(0);
} else {
- filter_map = assert_cast<const ColumnVector<UInt8>&>(*sv).get_data().data();
+ const IColumn::Filter& filter =
+ assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data();
+
+ auto* __restrict filter_data = filter.data();
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ (*_filter_ptr)[i] &= filter_data[i];
+ }
+ filter_map = filter.data();
}
return filter_map;
}
@@ -434,6 +438,125 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b
return Status::OK();
}
+Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
+ _filter_ptr.reset(new IColumn::Filter(read_rows, 1));
+ if (!_position_delete_ctx.has_filter) {
+ _total_read_rows += read_rows;
+ return Status::OK();
+ }
+ while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+ const int64_t delete_row_index_in_row_group =
+ _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
+ _position_delete_ctx.first_row_id;
+ int64_t read_range_rows = 0;
+ size_t remaining_read_rows = _total_read_rows + read_rows;
+ for (auto& range : _read_ranges) {
+ if (delete_row_index_in_row_group < range.first_row) {
+ ++_position_delete_ctx.index;
+ break;
+ } else if (delete_row_index_in_row_group < range.last_row) {
+ int64_t index = (delete_row_index_in_row_group - range.first_row) +
+ read_range_rows - _total_read_rows;
+ if (index > read_rows - 1) {
+ _total_read_rows += read_rows;
+ return Status::OK();
+ }
+ (*_filter_ptr)[index] = 0;
+ ++_position_delete_ctx.index;
+ break;
+ } else { // delete_row >= range.last_row
+ }
+
+ int64_t range_size = range.last_row - range.first_row;
+ // Don't search next range when there is no remaining_read_rows.
+ if (remaining_read_rows <= range_size) {
+ _total_read_rows += read_rows;
+ return Status::OK();
+ } else {
+ remaining_read_rows -= range_size;
+ read_range_rows += range_size;
+ }
+ }
+ }
+ _total_read_rows += read_rows;
+ return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, const ColumnPtr filter_column,
+ int column_to_keep, std::vector<uint32_t> columns_to_filter) {
+ if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
+ ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+
+ MutableColumnPtr mutable_holder =
+ nested_column->use_count() == 1
+ ? nested_column->assume_mutable()
+ : nested_column->clone_resized(nested_column->size());
+
+ ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(mutable_holder.get());
+ if (!concrete_column) {
+ return Status::InvalidArgument(
+ "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).",
+ filter_column->get_name());
+ }
+ auto* __restrict null_map = nullable_column->get_null_map_data().data();
+ IColumn::Filter& filter = concrete_column->get_data();
+ auto* __restrict filter_data = filter.data();
+
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i];
+ }
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
+ bool ret = const_column->get_bool(0);
+ if (!ret) {
+ for (auto& col : columns_to_filter) {
+ std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ }
+ } else {
+ const IColumn::Filter& filter =
+ assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
+ .get_data();
+
+ auto* __restrict filter_data = filter.data();
+ const size_t size = filter.size();
+ for (size_t i = 0; i < size; ++i) {
+ (*_filter_ptr)[i] &= filter_data[i];
+ }
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ }
+ Block::erase_useless_column(block, column_to_keep);
+ return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
+ const std::vector<uint32_t>& columns_to_filter) {
+ RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+ Block::erase_useless_column(block, column_to_keep);
+
+ return Status::OK();
+}
+
+Status RowGroupReader::_filter_block_internal(Block* block,
+ const std::vector<uint32_t>& columns_to_filter) {
+ size_t count = _filter_ptr->size() -
+ simd::count_zero_num((int8_t*)_filter_ptr->data(), _filter_ptr->size());
+ if (count == 0) {
+ for (auto& col : columns_to_filter) {
+ std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+ }
+ } else {
+ for (auto& col : columns_to_filter) {
+ if (block->get_by_position(col).column->size() != count) {
+ block->get_by_position(col).column =
+ block->get_by_position(col).column->filter(*_filter_ptr, count);
+ }
+ }
+ }
+ return Status::OK();
+}
+
ParquetColumnReader::Statistics RowGroupReader::statistics() {
ParquetColumnReader::Statistics st;
for (auto& reader : _column_readers) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index d53e32de2a..15ff153b60 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -131,6 +131,12 @@ private:
Status _fill_missing_columns(
Block* block, size_t rows,
const std::unordered_map<std::string, VExprContext*>& missing_columns);
+ Status _build_pos_delete_filter(size_t read_rows);
+ Status _filter_block(Block* block, const ColumnPtr filter_column, int column_to_keep,
+ std::vector<uint32_t> columns_to_filter);
+ Status _filter_block(Block* block, int column_to_keep,
+ const vector<uint32_t>& columns_to_filter);
+ Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter);
io::FileReaderSPtr _file_reader;
std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
@@ -148,5 +154,7 @@ private:
// If continuous batches are skipped, we can cache them to skip a whole page
size_t _cached_filtered_rows = 0;
std::unique_ptr<TextConverter> _text_converter = nullptr;
+ std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr;
+ int64_t _total_read_rows = 0;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org