You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:26:57 UTC

[arrow-rs] branch master updated: Use offset index in ParquetRecordBatchStream (#2526)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 1eb6c45e4 Use offset index in ParquetRecordBatchStream (#2526)
1eb6c45e4 is described below

commit 1eb6c45e48a7462f1492193d0ada46484191d58a
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Sat Aug 20 10:26:52 2022 -0400

    Use offset index in ParquetRecordBatchStream (#2526)
    
    * Use offset index in ParquetRecordBatchStream
    
    * remove debugging cruft and fix clippy warning
    
    * Do not use ReadOptions
    
    * Fix bug with dictionary pages
    
    * Review comments
    
    * Fix bug in page skipping logic
---
 parquet/src/arrow/arrow_reader/mod.rs       |   2 +-
 parquet/src/arrow/arrow_reader/selection.rs |  33 +++-
 parquet/src/arrow/async_reader.rs           | 235 +++++++++++++++++++++++++++-
 parquet/src/column/reader.rs                |   2 +-
 parquet/src/file/page_index/index_reader.rs |  16 +-
 5 files changed, 273 insertions(+), 15 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index d8eb441f5..74fff9935 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -207,7 +207,7 @@ pub trait ArrowReader {
 #[derive(Debug, Clone, Default)]
 pub struct ArrowReaderOptions {
     skip_arrow_metadata: bool,
-    page_index: bool,
+    pub(crate) page_index: bool,
 }
 
 impl ArrowReaderOptions {
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index f3d11d925..b6ee273ab 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -162,7 +162,12 @@ impl RowSelection {
                     current_selector = selectors.next();
                 }
             } else {
-                break;
+                if !(selector.skip || current_page_included) {
+                    let start = page.offset as usize;
+                    let end = start + page.compressed_page_size as usize;
+                    ranges.push(start..end);
+                }
+                current_selector = selectors.next()
             }
         }
 
@@ -564,5 +569,31 @@ mod tests {
 
         // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
         assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
+
+        let selection = RowSelection::from(vec![
+            // Skip first page
+            RowSelector::skip(10),
+            // Multiple selects in same page
+            RowSelector::select(3),
+            RowSelector::skip(3),
+            RowSelector::select(4),
+            // Select to page boundary
+            RowSelector::skip(5),
+            RowSelector::select(5),
+            // Skip full page past page boundary
+            RowSelector::skip(12),
+            // Select to final page bounday
+            RowSelector::select(12),
+            RowSelector::skip(1),
+            // Skip across final page boundary
+            RowSelector::skip(8),
+            // Select from final page
+            RowSelector::select(4),
+        ]);
+
+        let ranges = selection.scan_ranges(&index);
+
+        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
+        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
     }
 }
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 7bcc55038..b0d9143d6 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -78,7 +78,7 @@
 use std::collections::VecDeque;
 use std::fmt::Formatter;
 
-use std::io::SeekFrom;
+use std::io::{Cursor, SeekFrom};
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
@@ -88,6 +88,8 @@ use bytes::{Buf, Bytes};
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
 use futures::stream::Stream;
+use parquet_format::OffsetIndex;
+use thrift::protocol::TCompactInputProtocol;
 
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
 
@@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch;
 
 use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
 use crate::arrow::arrow_reader::{
-    evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
-    RowFilter, RowSelection,
+    evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
+    ParquetRecordBatchReader, RowFilter, RowSelection,
 };
 use crate::arrow::ProjectionMask;
 
@@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
 use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
 
+use crate::file::page_index::index_reader;
 use crate::file::FOOTER_SIZE;
 
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
@@ -218,6 +221,96 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
         Self::new_builder(AsyncReader(input), metadata, Default::default())
     }
 
+    pub async fn new_with_options(
+        mut input: T,
+        options: ArrowReaderOptions,
+    ) -> Result<Self> {
+        let mut metadata = input.get_metadata().await?;
+
+        if options.page_index
+            && metadata
+                .page_indexes()
+                .zip(metadata.offset_indexes())
+                .is_none()
+        {
+            let mut fetch_ranges = vec![];
+            let mut index_lengths: Vec<Vec<usize>> = vec![];
+
+            for rg in metadata.row_groups() {
+                let (loc_offset, loc_length) =
+                    index_reader::get_location_offset_and_total_length(rg.columns())?;
+
+                let (idx_offset, idx_lengths) =
+                    index_reader::get_index_offset_and_lengths(rg.columns())?;
+                let idx_length = idx_lengths.iter().sum::<usize>();
+
+                // If index data is missing, return without any indexes
+                if loc_length == 0 || idx_length == 0 {
+                    return Self::new_builder(AsyncReader(input), metadata, options);
+                }
+
+                fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
+                fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length);
+                index_lengths.push(idx_lengths);
+            }
+
+            let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut index_lengths = index_lengths.into_iter();
+
+            let mut row_groups = metadata.row_groups().to_vec();
+
+            let mut columns_indexes = vec![];
+            let mut offset_indexes = vec![];
+
+            for rg in row_groups.iter_mut() {
+                let columns = rg.columns();
+
+                let location_data = chunks.next().unwrap();
+                let mut cursor = Cursor::new(location_data);
+                let mut offset_index = vec![];
+
+                for _ in 0..columns.len() {
+                    let mut prot = TCompactInputProtocol::new(&mut cursor);
+                    let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+                    offset_index.push(offset.page_locations);
+                }
+
+                rg.set_page_offset(offset_index.clone());
+                offset_indexes.push(offset_index);
+
+                let index_data = chunks.next().unwrap();
+                let index_lengths = index_lengths.next().unwrap();
+
+                let mut start = 0;
+                let data = index_lengths.into_iter().map(|length| {
+                    let r = index_data.slice(start..start + length);
+                    start += length;
+                    r
+                });
+
+                let indexes = rg
+                    .columns()
+                    .iter()
+                    .zip(data)
+                    .map(|(column, data)| {
+                        let column_type = column.column_type();
+                        index_reader::deserialize_column_index(&data, column_type)
+                    })
+                    .collect::<Result<Vec<_>>>()?;
+                columns_indexes.push(indexes);
+            }
+
+            metadata = Arc::new(ParquetMetaData::new_with_page_index(
+                metadata.file_metadata().clone(),
+                row_groups,
+                Some(columns_indexes),
+                Some(offset_indexes),
+            ));
+        }
+
+        Self::new_builder(AsyncReader(input), metadata, options)
+    }
+
     /// Build a new [`ParquetRecordBatchStream`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
         let num_row_groups = self.metadata.row_groups().len();
@@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> {
             let fetch_ranges = self
                 .column_chunks
                 .iter()
+                .zip(self.metadata.columns())
                 .enumerate()
                 .into_iter()
-                .filter_map(|(idx, chunk)| {
+                .filter_map(|(idx, (chunk, chunk_meta))| {
                     (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                        let ranges = selection.scan_ranges(&page_locations[idx]);
+                        // If the first page does not start at the beginning of the column,
+                        // then we need to also fetch a dictionary page.
+                        let mut ranges = vec![];
+                        let (start, _len) = chunk_meta.byte_range();
+                        match page_locations[idx].first() {
+                            Some(first) if first.offset as u64 != start => {
+                                ranges.push(start as usize..first.offset as usize);
+                            }
+                            _ => (),
+                        }
+
+                        ranges.extend(selection.scan_ranges(&page_locations[idx]));
                         page_start_offsets
                             .push(ranges.iter().map(|range| range.start).collect());
+
                         ranges
                     })
                 })
@@ -687,7 +793,6 @@ mod tests {
     use crate::file::page_index::index_reader;
     use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
     use arrow::error::Result as ArrowResult;
-
     use futures::TryStreamExt;
     use std::sync::Mutex;
 
@@ -763,6 +868,70 @@ mod tests {
         );
     }
 
+    #[tokio::test]
+    async fn test_async_reader_with_index() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = parse_metadata(&data).unwrap();
+        let metadata = Arc::new(metadata);
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let options = ArrowReaderOptions::new().with_page_index(true);
+        let builder =
+            ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
+                .await
+                .unwrap();
+
+        // The builder should have page and offset indexes loaded now
+        let metadata_with_index = builder.metadata();
+
+        // Check offset indexes are present for all columns
+        for rg in metadata_with_index.row_groups() {
+            let page_locations = rg
+                .page_offset_index()
+                .as_ref()
+                .expect("expected page offset index");
+            assert_eq!(page_locations.len(), rg.columns().len())
+        }
+
+        // Check page indexes are present for all columns
+        let page_indexes = metadata_with_index
+            .page_indexes()
+            .expect("expected page indexes");
+        for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() {
+            assert_eq!(page_indexes[idx].len(), rg.columns().len())
+        }
+
+        let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
+        let stream = builder
+            .with_projection(mask.clone())
+            .with_batch_size(1024)
+            .build()
+            .unwrap();
+
+        let async_batches: Vec<_> = stream.try_collect().await.unwrap();
+
+        let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
+            .unwrap()
+            .with_projection(mask)
+            .with_batch_size(1024)
+            .build()
+            .unwrap()
+            .collect::<ArrowResult<Vec<_>>>()
+            .unwrap();
+
+        assert_eq!(async_batches, sync_batches);
+    }
+
     #[tokio::test]
     async fn test_row_filter() {
         let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
@@ -832,6 +1001,56 @@ mod tests {
         assert_eq!(requests.lock().unwrap().len(), 3);
     }
 
+    #[tokio::test]
+    async fn test_row_filter_with_index() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = parse_metadata(&data).unwrap();
+        let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+        let metadata = Arc::new(metadata);
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let a_filter = ArrowPredicateFn::new(
+            ProjectionMask::leaves(&parquet_schema, vec![1]),
+            |batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true),
+        );
+
+        let b_filter = ArrowPredicateFn::new(
+            ProjectionMask::leaves(&parquet_schema, vec![2]),
+            |batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32),
+        );
+
+        let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
+
+        let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
+
+        let options = ArrowReaderOptions::new().with_page_index(true);
+        let stream =
+            ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
+                .await
+                .unwrap()
+                .with_projection(mask.clone())
+                .with_batch_size(1024)
+                .with_row_filter(filter)
+                .build()
+                .unwrap();
+
+        let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
+
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+        assert_eq!(total_rows, 730);
+    }
+
     #[tokio::test]
     async fn test_in_memory_row_group_sparse() {
         let testdata = arrow::util::test_util::parquet_test_data();
@@ -882,7 +1101,7 @@ mod tests {
         let mut skip = true;
         let mut pages = offset_index[0].iter().peekable();
 
-        // Setup `RowSelection` so that we can skip every other page
+        // Setup `RowSelection` so that we can skip every other page, selecting the last page
         let mut selectors = vec![];
         let mut expected_page_requests: Vec<Range<usize>> = vec![];
         while let Some(page) = pages.next() {
@@ -906,7 +1125,7 @@ mod tests {
         let selection = RowSelection::from(selectors);
 
         let (_factory, _reader) = reader_factory
-            .read_row_group(0, Some(selection), projection, 48)
+            .read_row_group(0, Some(selection), projection.clone(), 48)
             .await
             .expect("reading row group");
 
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1432c72b5..f96ccc3ea 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -312,7 +312,7 @@ where
 
                 // If page has less rows than the remaining records to
                 // be skipped, skip entire page
-                if metadata.num_rows < remaining {
+                if metadata.num_rows <= remaining {
                     self.page_reader.skip_next_page()?;
                     remaining -= metadata.num_rows;
                     continue;
diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs
index e3f37fbc6..e6a4e5981 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -34,8 +34,12 @@ pub fn read_columns_indexes<R: ChunkReader>(
     let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
     let length = lengths.iter().sum::<usize>();
 
+    if length == 0 {
+        return Ok(vec![Index::NONE; chunks.len()]);
+    }
+
     //read all need data into buffer
-    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut reader = reader.get_read(offset, length)?;
     let mut data = vec![0; length];
     reader.read_exact(&mut data)?;
 
@@ -64,6 +68,10 @@ pub fn read_pages_locations<R: ChunkReader>(
 ) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
     let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
 
+    if total_length == 0 {
+        return Ok(vec![]);
+    }
+
     //read all need data into buffer
     let mut reader = reader.get_read(offset, total_length)?;
     let mut data = vec![0; total_length];
@@ -82,7 +90,7 @@ pub fn read_pages_locations<R: ChunkReader>(
 
 //Get File offsets of every ColumnChunk's page_index
 //If there are invalid offset return a zero offset with empty lengths.
-fn get_index_offset_and_lengths(
+pub(crate) fn get_index_offset_and_lengths(
     chunks: &[ColumnChunkMetaData],
 ) -> Result<(u64, Vec<usize>), ParquetError> {
     let first_col_metadata = if let Some(chunk) = chunks.first() {
@@ -111,7 +119,7 @@ fn get_index_offset_and_lengths(
 
 //Get File offset of ColumnChunk's pages_locations
 //If there are invalid offset return a zero offset with zero length.
-fn get_location_offset_and_total_length(
+pub(crate) fn get_location_offset_and_total_length(
     chunks: &[ColumnChunkMetaData],
 ) -> Result<(u64, usize), ParquetError> {
     let metadata = if let Some(chunk) = chunks.first() {
@@ -133,7 +141,7 @@ fn get_location_offset_and_total_length(
     Ok((offset, total_length))
 }
 
-fn deserialize_column_index(
+pub(crate) fn deserialize_column_index(
     data: &[u8],
     column_type: Type,
 ) -> Result<Index, ParquetError> {