You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/27 16:04:34 UTC

[doris] branch master updated: [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fa14b7ea9c [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
fa14b7ea9c is described below

commit fa14b7ea9c23bb56293c564ee3a5c0a0368b868a
Author: Qi Chen <ka...@gmail.com>
AuthorDate: Sat Jan 28 00:04:27 2023 +0800

    [Enhancement](icebergv2) Optimize the position delete file filtering mechanism in iceberg v2 parquet reader (#16024)
    
    close #16023
---
 .../exec/format/parquet/vparquet_group_reader.cpp  | 197 +++++++++++++++++----
 .../exec/format/parquet/vparquet_group_reader.h    |   8 +
 2 files changed, 168 insertions(+), 37 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 0c4ff2802c..f4818783f6 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -98,42 +98,35 @@ Status RowGroupReader::next_batch(Block* block, size_t batch_size, size_t* read_
                 _fill_partition_columns(block, *read_rows, _lazy_read_ctx.partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, *read_rows, _lazy_read_ctx.missing_columns));
 
-        Status st =
-                VExprContext::filter_block(_lazy_read_ctx.vconjunct_ctx, block, block->columns());
+        if (block->rows() == 0) {
+            *read_rows = block->rows();
+            return Status::OK();
+        }
+
+        RETURN_IF_ERROR(_build_pos_delete_filter(*read_rows));
+
+        std::vector<uint32_t> columns_to_filter;
+        int column_to_keep = block->columns();
+        columns_to_filter.resize(column_to_keep);
+        for (uint32_t i = 0; i < column_to_keep; ++i) {
+            columns_to_filter[i] = i;
+        }
+        if (_lazy_read_ctx.vconjunct_ctx != nullptr) {
+            int result_column_id = -1;
+            RETURN_IF_ERROR(_lazy_read_ctx.vconjunct_ctx->execute(block, &result_column_id));
+            ColumnPtr filter_column = block->get_by_position(result_column_id).column;
+            RETURN_IF_ERROR(_filter_block(block, filter_column, column_to_keep, columns_to_filter));
+        } else {
+            RETURN_IF_ERROR(_filter_block(block, column_to_keep, columns_to_filter));
+        }
+
         *read_rows = block->rows();
-        return st;
+        return Status::OK();
     }
 }
 
 void RowGroupReader::_merge_read_ranges(std::vector<RowRange>& row_ranges) {
-    // row_ranges is generated from page index, and the row index begins with 0 in each row group.
-    // _position_delete_ctx is generated from delete file, and the row index begins with 0 in parquet file
-    for (auto& range : row_ranges) {
-        int64_t start_row_id = range.first_row;
-        while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
-            const int64_t& delete_row_id =
-                    _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
-                    _position_delete_ctx.first_row_id;
-            if (delete_row_id < range.first_row) {
-                _position_delete_ctx.index++;
-            } else if (delete_row_id < range.last_row) {
-                if (start_row_id < delete_row_id) {
-                    _read_ranges.emplace_back(start_row_id, delete_row_id);
-                }
-                start_row_id = delete_row_id + 1;
-                _position_delete_ctx.index++;
-            } else { // delete_row_id >= range.last_row
-                if (start_row_id < range.last_row) {
-                    _read_ranges.emplace_back(start_row_id, range.last_row);
-                    start_row_id = range.last_row + 1;
-                }
-                break;
-            }
-        }
-        if (start_row_id < range.last_row) {
-            _read_ranges.emplace_back(start_row_id, range.last_row);
-        }
-    }
+    _read_ranges = row_ranges;
 }
 
 Status RowGroupReader::_read_column_data(Block* block, const std::vector<std::string>& columns,
@@ -194,6 +187,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
                                                 _lazy_read_ctx.predicate_partition_columns));
         RETURN_IF_ERROR(_fill_missing_columns(block, pre_read_rows,
                                               _lazy_read_ctx.predicate_missing_columns));
+
+        RETURN_IF_ERROR(_build_pos_delete_filter(pre_read_rows));
+
         // generate filter vector
         if (_lazy_read_ctx.resize_first_column) {
             // VExprContext.execute has an optimization, the filtering is executed when block->rows() > 0
@@ -262,8 +258,9 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re
             // generated from next batch, so the filter column is removed ahead.
             DCHECK_EQ(block->rows(), 0);
         } else {
-            Block::filter_block(block, _lazy_read_ctx.all_predicate_col_ids, filter_column_id,
-                                origin_column_num);
+            ColumnPtr filter_column = block->get_by_position(filter_column_id).column;
+            RETURN_IF_ERROR(_filter_block(block, filter_column, origin_column_num,
+                                          _lazy_read_ctx.all_predicate_col_ids));
         }
     } else {
         Block::erase_useless_column(block, origin_column_num);
@@ -302,16 +299,23 @@ const uint8_t* RowGroupReader::_build_filter_map(ColumnPtr& sv, size_t num_rows,
                     nullable_column->get_nested_column_ptr()->assume_mutable().get());
             uint8_t* filter_data = concrete_column->get_data().data();
             for (int i = 0; i < num_rows; ++i) {
-                // filter null if filter_column if nullable
-                filter_data[i] &= !null_map_column[i];
+                (*_filter_ptr)[i] &= (!null_map_column[i]) & filter_data[i];
             }
-            filter_map = filter_data;
+            filter_map = _filter_ptr->data();
         }
     } else if (auto* const_column = check_and_get_column<ColumnConst>(*sv)) {
         // filter all
         *can_filter_all = !const_column->get_bool(0);
     } else {
-        filter_map = assert_cast<const ColumnVector<UInt8>&>(*sv).get_data().data();
+        const IColumn::Filter& filter =
+                assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*sv).get_data();
+
+        auto* __restrict filter_data = filter.data();
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= filter_data[i];
+        }
+        filter_map = filter.data();
     }
     return filter_map;
 }
@@ -434,6 +438,125 @@ Status RowGroupReader::_read_empty_batch(size_t batch_size, size_t* read_rows, b
     return Status::OK();
 }
 
+Status RowGroupReader::_build_pos_delete_filter(size_t read_rows) {
+    _filter_ptr.reset(new IColumn::Filter(read_rows, 1));
+    if (!_position_delete_ctx.has_filter) {
+        _total_read_rows += read_rows;
+        return Status::OK();
+    }
+    while (_position_delete_ctx.index < _position_delete_ctx.end_index) {
+        const int64_t delete_row_index_in_row_group =
+                _position_delete_ctx.delete_rows[_position_delete_ctx.index] -
+                _position_delete_ctx.first_row_id;
+        int64_t read_range_rows = 0;
+        size_t remaining_read_rows = _total_read_rows + read_rows;
+        for (auto& range : _read_ranges) {
+            if (delete_row_index_in_row_group < range.first_row) {
+                ++_position_delete_ctx.index;
+                break;
+            } else if (delete_row_index_in_row_group < range.last_row) {
+                int64_t index = (delete_row_index_in_row_group - range.first_row) +
+                                read_range_rows - _total_read_rows;
+                if (index > read_rows - 1) {
+                    _total_read_rows += read_rows;
+                    return Status::OK();
+                }
+                (*_filter_ptr)[index] = 0;
+                ++_position_delete_ctx.index;
+                break;
+            } else { // delete_row >= range.last_row
+            }
+
+            int64_t range_size = range.last_row - range.first_row;
+            // Don't search next range when there is no remaining_read_rows.
+            if (remaining_read_rows <= range_size) {
+                _total_read_rows += read_rows;
+                return Status::OK();
+            } else {
+                remaining_read_rows -= range_size;
+                read_range_rows += range_size;
+            }
+        }
+    }
+    _total_read_rows += read_rows;
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, const ColumnPtr filter_column,
+                                     int column_to_keep, std::vector<uint32_t> columns_to_filter) {
+    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) {
+        ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
+
+        MutableColumnPtr mutable_holder =
+                nested_column->use_count() == 1
+                        ? nested_column->assume_mutable()
+                        : nested_column->clone_resized(nested_column->size());
+
+        ColumnUInt8* concrete_column = typeid_cast<ColumnUInt8*>(mutable_holder.get());
+        if (!concrete_column) {
+            return Status::InvalidArgument(
+                    "Illegal type {} of column for filter. Must be UInt8 or Nullable(UInt8).",
+                    filter_column->get_name());
+        }
+        auto* __restrict null_map = nullable_column->get_null_map_data().data();
+        IColumn::Filter& filter = concrete_column->get_data();
+        auto* __restrict filter_data = filter.data();
+
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= (!null_map[i]) & filter_data[i];
+        }
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    } else if (auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) {
+        bool ret = const_column->get_bool(0);
+        if (!ret) {
+            for (auto& col : columns_to_filter) {
+                std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+            }
+        }
+    } else {
+        const IColumn::Filter& filter =
+                assert_cast<const doris::vectorized::ColumnVector<UInt8>&>(*filter_column)
+                        .get_data();
+
+        auto* __restrict filter_data = filter.data();
+        const size_t size = filter.size();
+        for (size_t i = 0; i < size; ++i) {
+            (*_filter_ptr)[i] &= filter_data[i];
+        }
+        RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    }
+    Block::erase_useless_column(block, column_to_keep);
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block(Block* block, int column_to_keep,
+                                     const std::vector<uint32_t>& columns_to_filter) {
+    RETURN_IF_ERROR(_filter_block_internal(block, columns_to_filter));
+    Block::erase_useless_column(block, column_to_keep);
+
+    return Status::OK();
+}
+
+Status RowGroupReader::_filter_block_internal(Block* block,
+                                              const std::vector<uint32_t>& columns_to_filter) {
+    size_t count = _filter_ptr->size() -
+                   simd::count_zero_num((int8_t*)_filter_ptr->data(), _filter_ptr->size());
+    if (count == 0) {
+        for (auto& col : columns_to_filter) {
+            std::move(*block->get_by_position(col).column).assume_mutable()->clear();
+        }
+    } else {
+        for (auto& col : columns_to_filter) {
+            if (block->get_by_position(col).column->size() != count) {
+                block->get_by_position(col).column =
+                        block->get_by_position(col).column->filter(*_filter_ptr, count);
+            }
+        }
+    }
+    return Status::OK();
+}
+
 ParquetColumnReader::Statistics RowGroupReader::statistics() {
     ParquetColumnReader::Statistics st;
     for (auto& reader : _column_readers) {
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index d53e32de2a..15ff153b60 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -131,6 +131,12 @@ private:
     Status _fill_missing_columns(
             Block* block, size_t rows,
             const std::unordered_map<std::string, VExprContext*>& missing_columns);
+    Status _build_pos_delete_filter(size_t read_rows);
+    Status _filter_block(Block* block, const ColumnPtr filter_column, int column_to_keep,
+                         std::vector<uint32_t> columns_to_filter);
+    Status _filter_block(Block* block, int column_to_keep,
+                         const vector<uint32_t>& columns_to_filter);
+    Status _filter_block_internal(Block* block, const vector<uint32_t>& columns_to_filter);
 
     io::FileReaderSPtr _file_reader;
     std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _column_readers;
@@ -148,5 +154,7 @@ private:
     // If continuous batches are skipped, we can cache them to skip a whole page
     size_t _cached_filtered_rows = 0;
     std::unique_ptr<TextConverter> _text_converter = nullptr;
+    std::unique_ptr<IColumn::Filter> _filter_ptr = nullptr;
+    int64_t _total_read_rows = 0;
 };
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org