You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/06/30 12:38:12 UTC

[GitHub] [arrow-rs] Ted-Jiang opened a new pull request, #1977: Enable serialized_reader read specific Page by passing row ranges.

Ted-Jiang opened a new pull request, #1977:
URL: https://github.com/apache/arrow-rs/pull/1977

   # Which issue does this PR close?
   
   
   Closes #1976.
   
   # Rationale for this change
    Part support #1792 
   
   if we use page index get row ranges like below, get `row_ranges`
   ``` rust
           //filter `x < 11`
           let filter =
               |page: &PageIndex<i32>| page.max.as_ref().map(|&x| x < 11).unwrap_or(false);
   
           let mask = index.indexes.iter().map(filter).collect::<Vec<_>>();
   
           let row_ranges = compute_row_ranges(&mask, locations, total_rows).unwrap();
   ```
   we can pass the `row_ranges` to new API  to read parquet file(datafusion use this way but without `row_ranges`)
   ```
   fn get_record_reader_by_columns_and_row_ranges(
           &mut self,
           mask: ProjectionMask,
           row_ranges: &RowRanges,
           batch_size: usize,
       ) -> Result<ParquetRecordBatchReader> {
   ```
   
   # What changes are included in this PR?
   
   <!---
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   One example: if we read  col1, col2  and apply filter get the result we need read `row_ranges[20, 80]`,
   _For col1:_ 
     we need all data from page1, page2, page3. 
   _For col2:_
    after this PR, we will **filter**  page2 and keep page0, page1
        as for page1: need all data
        as for page0: we need part of its row_range(need row align **TODO**)
   ```
    * rows   col1   col2   col3
    *      ┌──────┬──────┬──────┐
    *   0  │  p0  │      │      │
    *      ╞══════╡  p0  │  p0  │
    *  20  │ p1(X)│------│------│
    *      ╞══════╪══════╡      │
    *  40  │ p2(X)│      │------│
    *      ╞══════╡ p1(X)╞══════╡
    *  60  │ p3(X)│      │------│
    *      ╞══════╪══════╡      │
    *  80  │  p4  │      │  p1  │
    *      ╞══════╡  p2  │      │
    * 100  │  p5  │      │      │
    *      └──────┴──────┴──────┘
    * 
   ```
   
   # Are there any user-facing changes?
   
   
   <!---
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911897463


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -353,6 +392,31 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
         Ok(Box::new(page_reader))
     }
 
+    fn get_column_page_reader_with_offset_index(

Review Comment:
   We can add UT using the writer API.
   https://github.com/apache/arrow-rs/pull/1935 has been merged, the parquet file contains the column index by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911920199


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.
+    index_map: Vec<usize>,
+}
+
+pub(crate) type OffsetRange = (Vec<usize>, Vec<usize>);
+
+impl FilterOffsetIndex {
+    pub(crate) fn try_new(
+        offset_index: &[PageLocation],
+        ranges: &RowRanges,
+        total_row_count: i64,
+    ) -> Self {
+        let mut index = vec![];
+        for i in 0..offset_index.len() {
+            let page_location: &PageLocation = &offset_index[i];
+            let page_range = if i == offset_index.len() - 1 {
+                Range::new(
+                    page_location.first_row_index as usize,
+                    total_row_count as usize,
+                )
+            } else {
+                let next_page_location: &PageLocation = &offset_index[i + 1];
+                Range::new(
+                    page_location.first_row_index as usize,
+                    (next_page_location.first_row_index - 1) as usize,
+                )
+            };
+            if ranges.is_overlapping(&page_range) {
+                index.push(i);
+            }
+        }
+
+        FilterOffsetIndex {
+            offset_index: offset_index.to_vec(),
+            index_map: index,
+        }
+    }
+
+    pub(crate) fn get_page_count(&self) -> usize {
+        self.index_map.len()
+    }
+
+    pub(crate) fn get_offset(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index.get(index as usize).unwrap().offset
+    }
+
+    pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .compressed_page_size
+    }
+
+    pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .first_row_index
+    }
+
+    pub(crate) fn get_last_row_index(
+        &self,
+        page_index: usize,
+        total_row_count: i64,
+    ) -> i64 {
+        let next_index = self.index_map[page_index] + 1;
+        if next_index >= self.get_page_count() {
+            total_row_count
+        } else {
+            self.offset_index
+                .get(next_index as usize)
+                .unwrap()
+                .first_row_index
+                - 1
+        }
+    }
+
+    // Return the offset of needed both data page and dictionary page.
+    // need input `row_group_offset` as input for checking if there is one dictionary page
+    // in one column chunk.
+    // Note: If data pages are adjacent, will merge them to one `OffsetRange`,
+    // for reduce seek effect.
+    pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange {
+        let mut start_list = vec![];
+        let mut length_list = vec![];
+        let page_count = self.get_page_count();
+        if page_count > 0 {
+            //Not after filter page offset
+            let first_page_offset = self.offset_index[0].offset;
+            // add dictionary page if required

Review Comment:
   I read all code and context, I think `row_group_offset` represent the offset of the dict page. Am I right?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172540557

   
   > Edit: I'll try and stub out some APIs for what I mean over the next couple of days. This will also help me validate my mental model checks out 😅
   
   Got it, i will delete the `get_record_reader_by_columns_and_row_ranges` and use options avoid public APIs
   
   tomorrow i  will start with
   
   > * Add a skip page function to SerializedPageReader that uses the column index to skip the next page without reading it (we may need to change it to take ChunkReader instead of Read)
   
   After read the code, i think if we want to skip page in `SerializedPageReader` we need get the page meta, but in SerializedPageReader it only care about the decode work. the pages offset already set to `buf: T` in `SerializedRowGroupReader` so i try to pass column index to `SerializedRowGroupReader` to change pages offset. 
   So one question 🤔
     So if i want to add `skip page` i need add owned page location and selected row to `SerializedPageReader`?
   
   I got worried about where should i pass the column index info.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911927683


##########
parquet/src/file/reader.rs:
##########
@@ -48,6 +49,14 @@ pub trait ChunkReader: Length + Send + Sync {
     /// get a serialy readeable slice of the current reader
     /// This should fail if the slice exceeds the current bounds
     fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
+
+    /// get a serially readable slice of the current reader
+    /// This should fail if the slice exceeds the current bounds
+    fn get_multi_range_read(
+        &self,
+        start_list: Vec<usize>,
+        length_list: Vec<usize>,

Review Comment:
   ```suggestion
           page_offsets: Vec<usize>,
           page_lengths: Vec<usize>,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911803945


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -353,6 +392,31 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
         Ok(Box::new(page_reader))
     }
 
+    fn get_column_page_reader_with_offset_index(

Review Comment:
   related https://github.com/apache/parquet-testing/pull/25



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] sunchao commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
sunchao commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1171995992

   Yes, I think the row ranges are internal to `parquet-rs` and should be calculated during the predicate pushdown. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911938903


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -248,21 +286,23 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
+            let mut columns_indexes = vec![];

Review Comment:
   Column index should be loaded by lazy.
   Loading the full column index at the beginning is very heavy.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172476897

   > I've had a quick review, unfortunately I think this is missing a key detail. In particular the arrow writer must read the same records from each of its columns. As written this simply skips reading pruned pages from columns. There is no relationship between the page boundaries across columns within a parquet, and therefore this will return different rows for each of the columns.
   
   Thanks @tustvold, your are right. Maybe I made the title confusing😭. as you mentioned in  [#1791 (review)]. (https://github.com/apache/arrow-rs/pull/1791#pullrequestreview-996352857):
   
   >Pass row selection down to RecordReader
   >Add a skip_next_page to PageReader
   >Add a skip_values to ColumnValueDecoder
   
   This pr is only about the `skip_next_page` part, we will only return the needed page metadata in iterator. As make the  same records from each of its columns (row align), i prefer support in next pr. I prefer to separate them to avoid huge PR and conflict. If you prefer to combine them, I will make this in progress and keep developing.
   
   > As described in [#1791 (review)](https://github.com/apache/arrow-rs/pull/1791#pullrequestreview-996352857), you will need to extract the row selection in addition to the page selection, and push this into RecordReader and ColumnValueDecoder. This will also make the API clearer, as we aren't going behind their back and skipping pages at the block-level
   As above, need pass the `row_ranges` to ColumnValueReader in future.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911917693


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.
+    index_map: Vec<usize>,
+}
+
+pub(crate) type OffsetRange = (Vec<usize>, Vec<usize>);
+
+impl FilterOffsetIndex {
+    pub(crate) fn try_new(
+        offset_index: &[PageLocation],
+        ranges: &RowRanges,
+        total_row_count: i64,
+    ) -> Self {
+        let mut index = vec![];
+        for i in 0..offset_index.len() {
+            let page_location: &PageLocation = &offset_index[i];
+            let page_range = if i == offset_index.len() - 1 {
+                Range::new(
+                    page_location.first_row_index as usize,
+                    total_row_count as usize,
+                )
+            } else {
+                let next_page_location: &PageLocation = &offset_index[i + 1];
+                Range::new(
+                    page_location.first_row_index as usize,
+                    (next_page_location.first_row_index - 1) as usize,
+                )
+            };
+            if ranges.is_overlapping(&page_range) {
+                index.push(i);
+            }
+        }
+
+        FilterOffsetIndex {
+            offset_index: offset_index.to_vec(),
+            index_map: index,
+        }
+    }
+
+    pub(crate) fn get_page_count(&self) -> usize {
+        self.index_map.len()
+    }
+
+    pub(crate) fn get_offset(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index.get(index as usize).unwrap().offset
+    }
+
+    pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .compressed_page_size
+    }
+
+    pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .first_row_index
+    }
+
+    pub(crate) fn get_last_row_index(
+        &self,
+        page_index: usize,
+        total_row_count: i64,
+    ) -> i64 {
+        let next_index = self.index_map[page_index] + 1;
+        if next_index >= self.get_page_count() {
+            total_row_count
+        } else {
+            self.offset_index
+                .get(next_index as usize)
+                .unwrap()
+                .first_row_index
+                - 1
+        }
+    }
+
+    // Return the offset of needed both data page and dictionary page.
+    // need input `row_group_offset` as input for checking if there is one dictionary page
+    // in one column chunk.
+    // Note: If data pages are adjacent, will merge them to one `OffsetRange`,
+    // for reduce seek effect.
+    pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange {
+        let mut start_list = vec![];
+        let mut length_list = vec![];
+        let page_count = self.get_page_count();
+        if page_count > 0 {
+            //Not after filter page offset
+            let first_page_offset = self.offset_index[0].offset;
+            // add dictionary page if required

Review Comment:
   Why do you assume that the dict page is in the first page of the column chunk?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911943811


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -248,21 +286,23 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
+            let mut columns_indexes = vec![];
+            let mut offset_indexes = vec![];
+            for rg in &filtered_row_groups {
+                let c = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
+                let p = index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
+
+                columns_indexes.push(c);
+                offset_indexes.push(p);

Review Comment:
   ```suggestion
                   let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
                   let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
   
                   columns_indexes.push(c);
                   offset_indexes.push(p);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911943811


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -248,21 +286,23 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
+            let mut columns_indexes = vec![];
+            let mut offset_indexes = vec![];
+            for rg in &filtered_row_groups {
+                let c = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
+                let p = index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
+
+                columns_indexes.push(c);
+                offset_indexes.push(p);

Review Comment:
   ```suggestion
                   let column_index = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;
                   let offset_index = index_reader::read_pages_locations(&chunk_reader, rg.columns())?;
   
                   columns_indexes.push(column_index);
                   offset_indexes.push(offset_index);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911719190


##########
parquet/src/column/reader.rs:
##########
@@ -160,9 +163,14 @@ where
             num_buffered_values: 0,
             num_decoded_values: 0,
             values_decoder,
+            selected_row_ranges: None,
         }
     }
 
+    pub(crate) fn set_row_ranges(&mut self, row_ranges: RowRanges) {
+        self.selected_row_ranges = Some(row_ranges);

Review Comment:
   need this row_ranges for row align in future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911925669


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.
+    index_map: Vec<usize>,
+}
+
+pub(crate) type OffsetRange = (Vec<usize>, Vec<usize>);
+
+impl FilterOffsetIndex {
+    pub(crate) fn try_new(
+        offset_index: &[PageLocation],
+        ranges: &RowRanges,
+        total_row_count: i64,
+    ) -> Self {
+        let mut index = vec![];
+        for i in 0..offset_index.len() {
+            let page_location: &PageLocation = &offset_index[i];
+            let page_range = if i == offset_index.len() - 1 {
+                Range::new(
+                    page_location.first_row_index as usize,
+                    total_row_count as usize,
+                )
+            } else {
+                let next_page_location: &PageLocation = &offset_index[i + 1];
+                Range::new(
+                    page_location.first_row_index as usize,
+                    (next_page_location.first_row_index - 1) as usize,
+                )
+            };
+            if ranges.is_overlapping(&page_range) {
+                index.push(i);
+            }
+        }
+
+        FilterOffsetIndex {
+            offset_index: offset_index.to_vec(),
+            index_map: index,
+        }
+    }
+
+    pub(crate) fn get_page_count(&self) -> usize {
+        self.index_map.len()
+    }
+
+    pub(crate) fn get_offset(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index.get(index as usize).unwrap().offset
+    }
+
+    pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .compressed_page_size
+    }
+
+    pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .first_row_index
+    }
+
+    pub(crate) fn get_last_row_index(
+        &self,
+        page_index: usize,
+        total_row_count: i64,
+    ) -> i64 {
+        let next_index = self.index_map[page_index] + 1;
+        if next_index >= self.get_page_count() {
+            total_row_count
+        } else {
+            self.offset_index
+                .get(next_index as usize)
+                .unwrap()
+                .first_row_index
+                - 1
+        }
+    }
+
+    // Return the offset of needed both data page and dictionary page.
+    // need input `row_group_offset` as input for checking if there is one dictionary page
+    // in one column chunk.
+    // Note: If data pages are adjacent, will merge them to one `OffsetRange`,
+    // for reduce seek effect.
+    pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange {
+        let mut start_list = vec![];
+        let mut length_list = vec![];
+        let page_count = self.get_page_count();
+        if page_count > 0 {
+            //Not after filter page offset
+            let first_page_offset = self.offset_index[0].offset;
+            // add dictionary page if required

Review Comment:
   You can make it clear by change name and type of parameter.
   like:
   ```
   calculate_offset_range(&self, directory_page_offset: Option<i64>)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911932308


##########
parquet/src/file/reader.rs:
##########
@@ -86,6 +95,14 @@ pub trait RowGroupReader: Send + Sync {
     /// Get page reader for the `i`th column chunk.
     fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
 
+    /// Get page reader for the `i`th column chunk with offset index(in one row group).
+    /// `row_group_pages_offset_index` construct from pageIndex and input rowRanges, here we skip needless page.
+    fn get_column_page_reader_with_offset_index(
+        &self,
+        column_index: usize,
+        row_group_pages_offset_index: &[FilterOffsetIndex],

Review Comment:
   ```suggestion
         page_offset_index : &FilterOffsetIndex,
   ```
   a reference of struct is enough.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911925669


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.
+    index_map: Vec<usize>,
+}
+
+pub(crate) type OffsetRange = (Vec<usize>, Vec<usize>);
+
+impl FilterOffsetIndex {
+    pub(crate) fn try_new(
+        offset_index: &[PageLocation],
+        ranges: &RowRanges,
+        total_row_count: i64,
+    ) -> Self {
+        let mut index = vec![];
+        for i in 0..offset_index.len() {
+            let page_location: &PageLocation = &offset_index[i];
+            let page_range = if i == offset_index.len() - 1 {
+                Range::new(
+                    page_location.first_row_index as usize,
+                    total_row_count as usize,
+                )
+            } else {
+                let next_page_location: &PageLocation = &offset_index[i + 1];
+                Range::new(
+                    page_location.first_row_index as usize,
+                    (next_page_location.first_row_index - 1) as usize,
+                )
+            };
+            if ranges.is_overlapping(&page_range) {
+                index.push(i);
+            }
+        }
+
+        FilterOffsetIndex {
+            offset_index: offset_index.to_vec(),
+            index_map: index,
+        }
+    }
+
+    pub(crate) fn get_page_count(&self) -> usize {
+        self.index_map.len()
+    }
+
+    pub(crate) fn get_offset(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index.get(index as usize).unwrap().offset
+    }
+
+    pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .compressed_page_size
+    }
+
+    pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .first_row_index
+    }
+
+    pub(crate) fn get_last_row_index(
+        &self,
+        page_index: usize,
+        total_row_count: i64,
+    ) -> i64 {
+        let next_index = self.index_map[page_index] + 1;
+        if next_index >= self.get_page_count() {
+            total_row_count
+        } else {
+            self.offset_index
+                .get(next_index as usize)
+                .unwrap()
+                .first_row_index
+                - 1
+        }
+    }
+
+    // Return the offset of needed both data page and dictionary page.
+    // need input `row_group_offset` as input for checking if there is one dictionary page
+    // in one column chunk.
+    // Note: If data pages are adjacent, will merge them to one `OffsetRange`,
+    // for reduce seek effect.
+    pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange {
+        let mut start_list = vec![];
+        let mut length_list = vec![];
+        let page_count = self.get_page_count();
+        if page_count > 0 {
+            //Not after filter page offset
+            let first_page_offset = self.offset_index[0].offset;
+            // add dictionary page if required

Review Comment:
   You can make it clear by change name and type of parameter.
   like:
   ```
   calculate_offset_range(&self, directory_page_offset: i64)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang closed pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang closed pull request #1977: Enable serialized_reader read specific Page by passing row ranges.
URL: https://github.com/apache/arrow-rs/pull/1977


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
tustvold commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911963350


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.

Review Comment:
   Why is this necessary?



##########
parquet/src/file/serialized_reader.rs:
##########
@@ -82,6 +91,27 @@ impl ChunkReader for Bytes {
         let start = start as usize;
         Ok(self.slice(start..start + length).reader())
     }
+
+    fn get_multi_range_read(
+        &self,
+        start_list: Vec<usize>,
+        length_list: Vec<usize>,
+    ) -> Result<Self::T> {
+        if start_list.len() != length_list.len() {
+            return Err(general_err!(
+                "Actual start_list size doesn't match the length_list size ({} vs {})",
+                start_list.len(),
+                length_list.len()
+            ));
+        } else {
+            let mut combine_vec: Vec<u8> = vec![];
+            for (start, length) in start_list.into_iter().zip(length_list.into_iter()) {
+                combine_vec.extend(self.slice(start..start + length).to_vec());
+            }
+            let reader = Bytes::copy_from_slice(combine_vec.as_slice()).reader();

Review Comment:
   This adds an additional copy of all the page bytes, which is definitely not ideal...



##########
parquet/src/file/reader.rs:
##########
@@ -48,6 +49,14 @@ pub trait ChunkReader: Length + Send + Sync {
     /// get a serialy readeable slice of the current reader
     /// This should fail if the slice exceeds the current bounds
     fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
+
+    /// get a serially readable slice of the current reader
+    /// This should fail if the slice exceeds the current bounds
+    fn get_multi_range_read(

Review Comment:
   As discussed on #1955  I'm not a fan of this, I would much rather the page reader reads pages, than skipping byte ranges behind its back.
   
   It also changes the semantics of how a column chunk is read, as it now buffers in memory an extra time



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -88,10 +98,15 @@ impl ArrowReaderOptions {
     ///
 
     /// Set `skip_arrow_metadata` to true, to skip decoding this
-    pub fn with_skip_arrow_metadata(self, skip_arrow_metadata: bool) -> Self {
-        Self {
-            skip_arrow_metadata,
-        }
+    pub fn with_skip_arrow_metadata(mut self, skip_arrow_metadata: bool) -> Self {
+        self.skip_arrow_metadata = skip_arrow_metadata;
+        self

Review Comment:
   ```suggestion
           {
               skip_arrow_metadata,
               ..self
           }
   ```
   
   And same below



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -130,17 +145,73 @@ impl ArrowReader for ParquetFileArrowReader {
         mask: ProjectionMask,
         batch_size: usize,
     ) -> Result<ParquetRecordBatchReader> {
-        let array_reader = build_array_reader(
-            self.file_reader
-                .metadata()
-                .file_metadata()
-                .schema_descr_ptr(),
-            Arc::new(self.get_schema()?),
-            mask,
-            Box::new(self.file_reader.clone()),
-        )?;
+        if self.options.selected_rows.is_some() {
+            let ranges = &self.options.selected_rows.as_ref().unwrap().clone();

Review Comment:
   ```suggestion
           if let Some(ranges) = self.options.selected_rows.as_ref()
   ```



##########
parquet/src/arrow/arrow_reader.rs:
##########
@@ -68,11 +70,19 @@ pub trait ArrowReader {
         mask: ProjectionMask,
         batch_size: usize,
     ) -> Result<Self::RecordReader>;
+
+    fn get_record_reader_by_columns_and_row_ranges(

Review Comment:
   Do we need this, or is the ArrowReaderOptions sufficient?



##########
parquet/src/file/metadata.rs:
##########
@@ -55,8 +55,8 @@ use crate::schema::types::{
 pub struct ParquetMetaData {
     file_metadata: FileMetaData,
     row_groups: Vec<RowGroupMetaData>,
-    page_indexes: Option<Vec<Index>>,
-    offset_indexes: Option<Vec<Vec<PageLocation>>>,
+    page_indexes: Option<Vec<Vec<Index>>>,

Review Comment:
   ```suggestion
       /// Page index for all pages in each column chunk
       page_indexes: Option<Vec<Vec<Index>>>,
   ```
   Or something like that, same for the below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r910989090


##########
parquet/src/file/metadata.rs:
##########
@@ -52,8 +52,8 @@ use crate::schema::types::{
 pub struct ParquetMetaData {
     file_metadata: FileMetaData,
     row_groups: Vec<RowGroupMetaData>,
-    page_indexes: Option<Vec<Index>>,
-    offset_indexes: Option<Vec<Vec<PageLocation>>>,
+    page_indexes: Option<Vec<Vec<Index>>>,
+    offset_indexes: Option<Vec<Vec<Vec<PageLocation>>>>,

Review Comment:
   Need 3 level `vec`: row_group -> column chunck -> page 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911719190


##########
parquet/src/column/reader.rs:
##########
@@ -160,9 +163,14 @@ where
             num_buffered_values: 0,
             num_decoded_values: 0,
             values_decoder,
+            selected_row_ranges: None,
         }
     }
 
+    pub(crate) fn set_row_ranges(&mut self, row_ranges: RowRanges) {
+        self.selected_row_ranges = Some(row_ranges);

Review Comment:
   Todo need this row_ranges for row align



##########
parquet/src/file/serialized_reader.rs:
##########
@@ -353,6 +392,31 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for SerializedRowGroupReader<'
         Ok(Box::new(page_reader))
     }
 
+    fn get_column_page_reader_with_offset_index(

Review Comment:
   Cause of lack test data in sub project in `parquet-testing`, will add end to end test after add test file in it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1175050415

   Marking as a draft, as I think the approach in #1998 is what we will take forward


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172058992

   @tustvold @viirya  others may interests in this, PTAL😊


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r910977280


##########
parquet/src/arrow/array_reader/builder.rs:
##########
@@ -39,33 +39,46 @@ use crate::data_type::{
     Int96Type,
 };
 use crate::errors::Result;
+use crate::file::filer_offset_index::FilterOffsetIndex;
 use crate::schema::types::{ColumnDescriptor, ColumnPath, SchemaDescPtr, Type};
 
 /// Create array reader from parquet schema, projection mask, and parquet file reader.
+/// 'row_groups_filter_offset_index' is optional for reducing useless IO
+/// by filtering needless page.
 pub fn build_array_reader(
     parquet_schema: SchemaDescPtr,
     arrow_schema: SchemaRef,
     mask: ProjectionMask,
     row_groups: Box<dyn RowGroupCollection>,
+    row_groups_filter_offset_index: Option<Vec<Vec<FilterOffsetIndex>>>,
 ) -> Result<Box<dyn ArrayReader>> {
     let field =
         convert_schema(parquet_schema.as_ref(), mask, Some(arrow_schema.as_ref()))?;
 
     match &field {
-        Some(field) => build_reader(field, row_groups.as_ref()),
+        Some(field) => build_reader(
+            field,
+            row_groups.as_ref(),
+            row_groups_filter_offset_index.as_ref(),
+        ),
         None => Ok(make_empty_array_reader(row_groups.num_rows())),
     }
 }
 
 fn build_reader(
     field: &ParquetField,
     row_groups: &dyn RowGroupCollection,
+    row_groups_filter_offset_index: Option<&Vec<Vec<FilterOffsetIndex>>>,
 ) -> Result<Box<dyn ArrayReader>> {
     match field.field_type {
-        ParquetFieldType::Primitive { .. } => build_primitive_reader(field, row_groups),
+        ParquetFieldType::Primitive { .. } => {
+            build_primitive_reader(field, row_groups, row_groups_filter_offset_index)

Review Comment:
   For now just support primitive_reader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1171837177

   I'm confused about the design of the new API described above
   ```
   fn get_record_reader_by_columns_and_row_ranges(
           &mut self,
           mask: ProjectionMask,
           row_ranges: &RowRanges,
           batch_size: usize,
       ) -> Result<ParquetRecordBatchReader> {
   ```
   I think column index reader should be a function for parquet reader or parquet-rs, any one who call the parquet reader should get the benefit from this optimization with a filter.
   
   From your implementation, I find user need to call the lower api and use the column index to calculate the `ranges`.  If so, Any user who want to use the column index of the parquet should add complex custom logic to fit this lower interface.
   What is your option? 
   @sunchao  @tustvold @viirya 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911932308


##########
parquet/src/file/reader.rs:
##########
@@ -86,6 +95,14 @@ pub trait RowGroupReader: Send + Sync {
     /// Get page reader for the `i`th column chunk.
     fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
 
+    /// Get page reader for the `i`th column chunk with offset index(in one row group).
+    /// `row_group_pages_offset_index` construct from pageIndex and input rowRanges, here we skip needless page.
+    fn get_column_page_reader_with_offset_index(
+        &self,
+        column_index: usize,
+        row_group_pages_offset_index: &[FilterOffsetIndex],

Review Comment:
   ```suggestion
         page_offset_index : &FilterOffsetIndex,
   ```
   a reference of struct is enough.
   I can't figure out a good name for parameter. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r910980736


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -452,7 +454,12 @@ impl RowGroupCollection for InMemoryRowGroup {
         self.row_count
     }
 
-    fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
+    fn column_chunks(
+        &self,
+        i: usize,
+        row_groups_filter_offset_index: Option<&Vec<FilterOffsetIndex>>,
+    ) -> Result<Box<dyn PageIterator>> {
+        //todo support page level filter

Review Comment:
   will support in InMemoryReader



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1184749541

   @Ted-Jiang Can this be closed now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172493031

   I think small incremental PRs is a good approach. However, I have concerns with this specific PR:
   
   * It introduces public APIs that don't have clear semantics (the skipped rows are somewhat arbitrary)
   * I would prefer an approach that collocates the page and row skipping logic, instead of treating them as separate concerns. Once RecordReader is skipping rows it will be incredibly confusing if pages are being skipped somewhere else in addition
   
   I wonder if a plan of attack akin to the following might work:
   
   * Add a skip page function to SerializedPageReader that uses the column index to skip the next page without reading it (we may need to change it to take ChunkReader instead of Read)
   * Same as the above for InMemoryPageReader
   * Add the ability to skip decoding rows to ColumnValueDecoder
   * Pass index and row selection down to RecordReader
   * Perform skipping
   
   Currently it feels like we're adding the high-level functionality before the necessary lower level functionality exists...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911917693


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.
+    index_map: Vec<usize>,
+}
+
+pub(crate) type OffsetRange = (Vec<usize>, Vec<usize>);
+
+impl FilterOffsetIndex {
+    pub(crate) fn try_new(
+        offset_index: &[PageLocation],
+        ranges: &RowRanges,
+        total_row_count: i64,
+    ) -> Self {
+        let mut index = vec![];
+        for i in 0..offset_index.len() {
+            let page_location: &PageLocation = &offset_index[i];
+            let page_range = if i == offset_index.len() - 1 {
+                Range::new(
+                    page_location.first_row_index as usize,
+                    total_row_count as usize,
+                )
+            } else {
+                let next_page_location: &PageLocation = &offset_index[i + 1];
+                Range::new(
+                    page_location.first_row_index as usize,
+                    (next_page_location.first_row_index - 1) as usize,
+                )
+            };
+            if ranges.is_overlapping(&page_range) {
+                index.push(i);
+            }
+        }
+
+        FilterOffsetIndex {
+            offset_index: offset_index.to_vec(),
+            index_map: index,
+        }
+    }
+
+    pub(crate) fn get_page_count(&self) -> usize {
+        self.index_map.len()
+    }
+
+    pub(crate) fn get_offset(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index.get(index as usize).unwrap().offset
+    }
+
+    pub(crate) fn get_compressed_page_size(&self, page_index: usize) -> i32 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .compressed_page_size
+    }
+
+    pub(crate) fn get_first_row_index(&self, page_index: usize) -> i64 {
+        let index = self.index_map[page_index];
+        self.offset_index
+            .get(index as usize)
+            .unwrap()
+            .first_row_index
+    }
+
+    pub(crate) fn get_last_row_index(
+        &self,
+        page_index: usize,
+        total_row_count: i64,
+    ) -> i64 {
+        let next_index = self.index_map[page_index] + 1;
+        if next_index >= self.get_page_count() {
+            total_row_count
+        } else {
+            self.offset_index
+                .get(next_index as usize)
+                .unwrap()
+                .first_row_index
+                - 1
+        }
+    }
+
+    // Return the offset of needed both data page and dictionary page.
+    // need input `row_group_offset` as input for checking if there is one dictionary page
+    // in one column chunk.
+    // Note: If data pages are adjacent, will merge them to one `OffsetRange`,
+    // for reduce seek effect.
+    pub(crate) fn calculate_offset_range(&self, row_group_offset: i64) -> OffsetRange {
+        let mut start_list = vec![];
+        let mut length_list = vec![];
+        let page_count = self.get_page_count();
+        if page_count > 0 {
+            //Not after filter page offset
+            let first_page_offset = self.offset_index[0].offset;
+            // add dictionary page if required

Review Comment:
   Why do you assume that the dict page is in the first page of the column chunk?
   
   I find the definition from https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L542



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r912083556


##########
parquet/src/file/page_index/filer_offset_index.rs:
##########
@@ -0,0 +1,293 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file::page_index::range::{Range, RowRanges};
+use parquet_format::PageLocation;
+
+/// Returns the filtered offset index containing only the pages which are overlapping with rowRanges.
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct FilterOffsetIndex {
+    // read from parquet file which before the footer.
+    offset_index: Vec<PageLocation>,
+
+    // use to keep needed page index.

Review Comment:
   We pass all `PageLocation` and `RowRanges` in this struct then do the filter logic.
   
   if we have 5 pages, in `try_new`, we filter 2 pages and keep these 3 pages index_numbers in this `index_map` for final `calculate_offset_range`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911938903


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -248,21 +286,23 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
+            let mut columns_indexes = vec![];

Review Comment:
   Column index should be loaded by lazy.
   From the open/read the parquet file, we just load all the column index of the filtered rowgroup.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911934370


##########
parquet/src/arrow/array_reader/mod.rs:
##########
@@ -102,8 +109,16 @@ impl RowGroupCollection for Arc<dyn FileReader> {
         self.metadata().file_metadata().num_rows() as usize
     }
 
-    fn column_chunks(&self, column_index: usize) -> Result<Box<dyn PageIterator>> {
-        let iterator = FilePageIterator::new(column_index, Arc::clone(self))?;
+    fn column_chunks(
+        &self,
+        i: usize,

Review Comment:
   ```suggestion
           column_index: usize,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] liukun4515 commented on a diff in pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on code in PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#discussion_r911945307


##########
parquet/src/file/serialized_reader.rs:
##########
@@ -248,21 +286,23 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
         }
 
         if options.enable_page_index {
-            //Todo for now test data `data_index_bloom_encoding_stats.parquet` only have one rowgroup
-            //support multi after create multi-RG test data.
-            let cols = metadata.row_group(0);
-            let columns_indexes =
-                index_reader::read_columns_indexes(&chunk_reader, cols.columns())?;
-            let pages_locations =
-                index_reader::read_pages_locations(&chunk_reader, cols.columns())?;
+            let mut columns_indexes = vec![];
+            let mut offset_indexes = vec![];
+            for rg in &filtered_row_groups {
+                let c = index_reader::read_columns_indexes(&chunk_reader, rg.columns())?;

Review Comment:
   If a schema has `co1,col2,col3.....col8`, and we just need the `col1` and `col3`, do we need to load other useless index data?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
tustvold commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172541578

   > I got worried about where should i pass the column index info
   
   Give me a day or so and I'll get a PR up with some stuff stubbed out, I think this exercise will help us both :smile: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on pull request #1977: Enable serialized_reader read specific Page by passing row ranges.

Posted by GitBox <gi...@apache.org>.
Ted-Jiang commented on PR #1977:
URL: https://github.com/apache/arrow-rs/pull/1977#issuecomment-1172057493

   > should be calculated during the predicate pushdown.
   
   Yes, i agree it's better keep it private.
   But in current code base we not have a `filter` struct like in java to provide predicate pushdown.
   Like datafusion use 'mask' (level for column), 'row ranges' is like a *page_mask* 
   ```
     fn get_record_reader_by_columns(
           &mut self,
           mask: ProjectionMask,
           batch_size: usize,
       ) -> Result<ParquetRecordBatchReader> {
   ```
   I think for now, we make it pub, after full support for predicate pushdown, we will do like in java.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org