You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/14 06:47:39 UTC

[doris] 02/05: [fix](multi-catalog)fix page index thrift deserialize (#15001)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit df5e8ee5d3bf86f2a80b28ebb3eb4ada7ac73fa1
Author: slothever <18...@users.noreply.github.com>
AuthorDate: Tue Dec 13 13:33:19 2022 +0800

    [fix](multi-catalog)fix page index thrift deserialize (#15001)
    
    fix the err when parse page index: Couldn't deserialize thrift msg.
    use two buffer to store column index and offset index msg, avoid parse them in a buffer
---
 be/src/vec/exec/format/parquet/vparquet_page_index.cpp |  6 +++---
 be/src/vec/exec/format/parquet/vparquet_page_index.h   |  2 +-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp     | 18 +++++++++++-------
 3 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
index a5673833a8..acb56d0c2a 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.cpp
@@ -94,11 +94,11 @@ Status PageIndex::parse_column_index(const tparquet::ColumnChunk& chunk, const u
 }
 
 Status PageIndex::parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
-                                     int64_t buffer_size, tparquet::OffsetIndex* offset_index) {
-    int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start + _column_index_size;
+                                     tparquet::OffsetIndex* offset_index) {
+    int64_t buffer_offset = chunk.offset_index_offset - _offset_index_start;
     uint32_t length = chunk.offset_index_length;
     DCHECK_GE(buffer_offset, 0);
-    DCHECK_LE(buffer_offset + length, buffer_size);
+    DCHECK_LE(buffer_offset + length, _offset_index_size);
     RETURN_IF_ERROR(deserialize_thrift_msg(buff + buffer_offset, &length, true, offset_index));
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_page_index.h b/be/src/vec/exec/format/parquet/vparquet_page_index.h
index c5f8183e35..978a798bf4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_page_index.h
+++ b/be/src/vec/exec/format/parquet/vparquet_page_index.h
@@ -38,7 +38,7 @@ public:
     Status parse_column_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
                               tparquet::ColumnIndex* _column_index);
     Status parse_offset_index(const tparquet::ColumnChunk& chunk, const uint8_t* buff,
-                              int64_t buffer_size, tparquet::OffsetIndex* _offset_index);
+                              tparquet::OffsetIndex* _offset_index);
 
 private:
     friend class ParquetReader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index cb22f9d36e..01c226b1e4 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -545,14 +545,18 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
     if (!_has_page_index(row_group.columns, page_index)) {
         return Status::OK();
     }
-    int64_t buffer_size = page_index._column_index_size + page_index._offset_index_size;
-    uint8_t buff[buffer_size];
+    //    int64_t buffer_size = page_index._column_index_size;
+    uint8_t col_index_buff[page_index._column_index_size];
     int64_t bytes_read = 0;
-    RETURN_IF_ERROR(
-            _file_reader->readat(page_index._column_index_start, buffer_size, &bytes_read, buff));
-
+    RETURN_IF_ERROR(_file_reader->readat(page_index._column_index_start,
+                                         page_index._column_index_size, &bytes_read,
+                                         col_index_buff));
     auto& schema_desc = _file_metadata->schema();
     std::vector<RowRange> skipped_row_ranges;
+    uint8_t off_index_buff[page_index._offset_index_size];
+    RETURN_IF_ERROR(_file_reader->readat(page_index._offset_index_start,
+                                         page_index._offset_index_size, &bytes_read,
+                                         off_index_buff));
     for (auto& read_col : _read_columns) {
         auto conjunct_iter = _colname_to_value_range->find(read_col._file_slot_name);
         if (_colname_to_value_range->end() == conjunct_iter) {
@@ -563,7 +567,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
         if (chunk.column_index_offset == 0 && chunk.column_index_length == 0) {
             return Status::OK();
         }
-        RETURN_IF_ERROR(page_index.parse_column_index(chunk, buff, &column_index));
+        RETURN_IF_ERROR(page_index.parse_column_index(chunk, col_index_buff, &column_index));
         const int num_of_pages = column_index.null_pages.size();
         if (num_of_pages <= 0) {
             break;
@@ -577,7 +581,7 @@ Status ParquetReader::_process_page_index(const tparquet::RowGroup& row_group) {
             continue;
         }
         tparquet::OffsetIndex offset_index;
-        RETURN_IF_ERROR(page_index.parse_offset_index(chunk, buff, buffer_size, &offset_index));
+        RETURN_IF_ERROR(page_index.parse_offset_index(chunk, off_index_buff, &offset_index));
         for (int page_id : skipped_page_range) {
             RowRange skipped_row_range;
             page_index.create_skipped_row_range(offset_index, row_group.num_rows, page_id,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org