You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/08/20 14:26:57 UTC
[arrow-rs] branch master updated: Use offset index in ParquetRecordBatchStream (#2526)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 1eb6c45e4 Use offset index in ParquetRecordBatchStream (#2526)
1eb6c45e4 is described below
commit 1eb6c45e48a7462f1492193d0ada46484191d58a
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Sat Aug 20 10:26:52 2022 -0400
Use offset index in ParquetRecordBatchStream (#2526)
* Use offset index in ParquetRecordBatchStream
* remove debugging cruft and fix clippy warning
* Do not use ReadOptions
* Fix bug with dictionary pages
* Review comments
* Fix bug in page skipping logic
---
parquet/src/arrow/arrow_reader/mod.rs | 2 +-
parquet/src/arrow/arrow_reader/selection.rs | 33 +++-
parquet/src/arrow/async_reader.rs | 235 +++++++++++++++++++++++++++-
parquet/src/column/reader.rs | 2 +-
parquet/src/file/page_index/index_reader.rs | 16 +-
5 files changed, 273 insertions(+), 15 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs b/parquet/src/arrow/arrow_reader/mod.rs
index d8eb441f5..74fff9935 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -207,7 +207,7 @@ pub trait ArrowReader {
#[derive(Debug, Clone, Default)]
pub struct ArrowReaderOptions {
skip_arrow_metadata: bool,
- page_index: bool,
+ pub(crate) page_index: bool,
}
impl ArrowReaderOptions {
diff --git a/parquet/src/arrow/arrow_reader/selection.rs b/parquet/src/arrow/arrow_reader/selection.rs
index f3d11d925..b6ee273ab 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
- break;
+ if !(selector.skip || current_page_included) {
+ let start = page.offset as usize;
+ let end = start + page.compressed_page_size as usize;
+ ranges.push(start..end);
+ }
+ current_selector = selectors.next()
}
}
@@ -564,5 +569,31 @@ mod tests {
// assert_eq!(mask, vec![false, true, true, false, true, true, true]);
assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
+
+ let selection = RowSelection::from(vec![
+ // Skip first page
+ RowSelector::skip(10),
+ // Multiple selects in same page
+ RowSelector::select(3),
+ RowSelector::skip(3),
+ RowSelector::select(4),
+ // Select to page boundary
+ RowSelector::skip(5),
+ RowSelector::select(5),
+ // Skip full page past page boundary
+ RowSelector::skip(12),
+ // Select to final page bounday
+ RowSelector::select(12),
+ RowSelector::skip(1),
+ // Skip across final page boundary
+ RowSelector::skip(8),
+ // Select from final page
+ RowSelector::select(4),
+ ]);
+
+ let ranges = selection.scan_ranges(&index);
+
+ // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
+ assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
}
}
diff --git a/parquet/src/arrow/async_reader.rs b/parquet/src/arrow/async_reader.rs
index 7bcc55038..b0d9143d6 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -78,7 +78,7 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
-use std::io::SeekFrom;
+use std::io::{Cursor, SeekFrom};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
@@ -88,6 +88,8 @@ use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
+use parquet_format::OffsetIndex;
+use thrift::protocol::TCompactInputProtocol;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
@@ -96,8 +98,8 @@ use arrow::record_batch::RecordBatch;
use crate::arrow::array_reader::{build_array_reader, RowGroupCollection};
use crate::arrow::arrow_reader::{
- evaluate_predicate, selects_any, ArrowReaderBuilder, ParquetRecordBatchReader,
- RowFilter, RowSelection,
+ evaluate_predicate, selects_any, ArrowReaderBuilder, ArrowReaderOptions,
+ ParquetRecordBatchReader, RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
@@ -108,6 +110,7 @@ use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+use crate::file::page_index::index_reader;
use crate::file::FOOTER_SIZE;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
@@ -218,6 +221,96 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}
+ pub async fn new_with_options(
+ mut input: T,
+ options: ArrowReaderOptions,
+ ) -> Result<Self> {
+ let mut metadata = input.get_metadata().await?;
+
+ if options.page_index
+ && metadata
+ .page_indexes()
+ .zip(metadata.offset_indexes())
+ .is_none()
+ {
+ let mut fetch_ranges = vec![];
+ let mut index_lengths: Vec<Vec<usize>> = vec![];
+
+ for rg in metadata.row_groups() {
+ let (loc_offset, loc_length) =
+ index_reader::get_location_offset_and_total_length(rg.columns())?;
+
+ let (idx_offset, idx_lengths) =
+ index_reader::get_index_offset_and_lengths(rg.columns())?;
+ let idx_length = idx_lengths.iter().sum::<usize>();
+
+ // If index data is missing, return without any indexes
+ if loc_length == 0 || idx_length == 0 {
+ return Self::new_builder(AsyncReader(input), metadata, options);
+ }
+
+ fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
+ fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length);
+ index_lengths.push(idx_lengths);
+ }
+
+ let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter();
+ let mut index_lengths = index_lengths.into_iter();
+
+ let mut row_groups = metadata.row_groups().to_vec();
+
+ let mut columns_indexes = vec![];
+ let mut offset_indexes = vec![];
+
+ for rg in row_groups.iter_mut() {
+ let columns = rg.columns();
+
+ let location_data = chunks.next().unwrap();
+ let mut cursor = Cursor::new(location_data);
+ let mut offset_index = vec![];
+
+ for _ in 0..columns.len() {
+ let mut prot = TCompactInputProtocol::new(&mut cursor);
+ let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+ offset_index.push(offset.page_locations);
+ }
+
+ rg.set_page_offset(offset_index.clone());
+ offset_indexes.push(offset_index);
+
+ let index_data = chunks.next().unwrap();
+ let index_lengths = index_lengths.next().unwrap();
+
+ let mut start = 0;
+ let data = index_lengths.into_iter().map(|length| {
+ let r = index_data.slice(start..start + length);
+ start += length;
+ r
+ });
+
+ let indexes = rg
+ .columns()
+ .iter()
+ .zip(data)
+ .map(|(column, data)| {
+ let column_type = column.column_type();
+ index_reader::deserialize_column_index(&data, column_type)
+ })
+ .collect::<Result<Vec<_>>>()?;
+ columns_indexes.push(indexes);
+ }
+
+ metadata = Arc::new(ParquetMetaData::new_with_page_index(
+ metadata.file_metadata().clone(),
+ row_groups,
+ Some(columns_indexes),
+ Some(offset_indexes),
+ ));
+ }
+
+ Self::new_builder(AsyncReader(input), metadata, options)
+ }
+
/// Build a new [`ParquetRecordBatchStream`]
pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
let num_row_groups = self.metadata.row_groups().len();
@@ -493,13 +586,26 @@ impl<'a> InMemoryRowGroup<'a> {
let fetch_ranges = self
.column_chunks
.iter()
+ .zip(self.metadata.columns())
.enumerate()
.into_iter()
- .filter_map(|(idx, chunk)| {
+ .filter_map(|(idx, (chunk, chunk_meta))| {
(chunk.is_none() && projection.leaf_included(idx)).then(|| {
- let ranges = selection.scan_ranges(&page_locations[idx]);
+ // If the first page does not start at the beginning of the column,
+ // then we need to also fetch a dictionary page.
+ let mut ranges = vec![];
+ let (start, _len) = chunk_meta.byte_range();
+ match page_locations[idx].first() {
+ Some(first) if first.offset as u64 != start => {
+ ranges.push(start as usize..first.offset as usize);
+ }
+ _ => (),
+ }
+
+ ranges.extend(selection.scan_ranges(&page_locations[idx]));
page_start_offsets
.push(ranges.iter().map(|range| range.start).collect());
+
ranges
})
})
@@ -687,7 +793,6 @@ mod tests {
use crate::file::page_index::index_reader;
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;
-
use futures::TryStreamExt;
use std::sync::Mutex;
@@ -763,6 +868,70 @@ mod tests {
);
}
+ #[tokio::test]
+ async fn test_async_reader_with_index() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = parse_metadata(&data).unwrap();
+ let metadata = Arc::new(metadata);
+
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let options = ArrowReaderOptions::new().with_page_index(true);
+ let builder =
+ ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
+ .await
+ .unwrap();
+
+ // The builder should have page and offset indexes loaded now
+ let metadata_with_index = builder.metadata();
+
+ // Check offset indexes are present for all columns
+ for rg in metadata_with_index.row_groups() {
+ let page_locations = rg
+ .page_offset_index()
+ .as_ref()
+ .expect("expected page offset index");
+ assert_eq!(page_locations.len(), rg.columns().len())
+ }
+
+ // Check page indexes are present for all columns
+ let page_indexes = metadata_with_index
+ .page_indexes()
+ .expect("expected page indexes");
+ for (idx, rg) in metadata_with_index.row_groups().iter().enumerate() {
+ assert_eq!(page_indexes[idx].len(), rg.columns().len())
+ }
+
+ let mask = ProjectionMask::leaves(builder.parquet_schema(), vec![1, 2]);
+ let stream = builder
+ .with_projection(mask.clone())
+ .with_batch_size(1024)
+ .build()
+ .unwrap();
+
+ let async_batches: Vec<_> = stream.try_collect().await.unwrap();
+
+ let sync_batches = ParquetRecordBatchReaderBuilder::try_new(data)
+ .unwrap()
+ .with_projection(mask)
+ .with_batch_size(1024)
+ .build()
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+
+ assert_eq!(async_batches, sync_batches);
+ }
+
#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
@@ -832,6 +1001,56 @@ mod tests {
assert_eq!(requests.lock().unwrap().len(), 3);
}
+ #[tokio::test]
+ async fn test_row_filter_with_index() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_tiny_pages_plain.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = parse_metadata(&data).unwrap();
+ let parquet_schema = metadata.file_metadata().schema_descr_ptr();
+ let metadata = Arc::new(metadata);
+
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let a_filter = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![1]),
+ |batch| arrow::compute::eq_dyn_bool_scalar(batch.column(0), true),
+ );
+
+ let b_filter = ArrowPredicateFn::new(
+ ProjectionMask::leaves(&parquet_schema, vec![2]),
+ |batch| arrow::compute::eq_dyn_scalar(batch.column(0), 2_i32),
+ );
+
+ let filter = RowFilter::new(vec![Box::new(a_filter), Box::new(b_filter)]);
+
+ let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 2]);
+
+ let options = ArrowReaderOptions::new().with_page_index(true);
+ let stream =
+ ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
+ .await
+ .unwrap()
+ .with_projection(mask.clone())
+ .with_batch_size(1024)
+ .with_row_filter(filter)
+ .build()
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = stream.try_collect().await.unwrap();
+
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+ assert_eq!(total_rows, 730);
+ }
+
#[tokio::test]
async fn test_in_memory_row_group_sparse() {
let testdata = arrow::util::test_util::parquet_test_data();
@@ -882,7 +1101,7 @@ mod tests {
let mut skip = true;
let mut pages = offset_index[0].iter().peekable();
- // Setup `RowSelection` so that we can skip every other page
+ // Setup `RowSelection` so that we can skip every other page, selecting the last page
let mut selectors = vec![];
let mut expected_page_requests: Vec<Range<usize>> = vec![];
while let Some(page) = pages.next() {
@@ -906,7 +1125,7 @@ mod tests {
let selection = RowSelection::from(selectors);
let (_factory, _reader) = reader_factory
- .read_row_group(0, Some(selection), projection, 48)
+ .read_row_group(0, Some(selection), projection.clone(), 48)
.await
.expect("reading row group");
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 1432c72b5..f96ccc3ea 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -312,7 +312,7 @@ where
// If page has less rows than the remaining records to
// be skipped, skip entire page
- if metadata.num_rows < remaining {
+ if metadata.num_rows <= remaining {
self.page_reader.skip_next_page()?;
remaining -= metadata.num_rows;
continue;
diff --git a/parquet/src/file/page_index/index_reader.rs b/parquet/src/file/page_index/index_reader.rs
index e3f37fbc6..e6a4e5981 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -34,8 +34,12 @@ pub fn read_columns_indexes<R: ChunkReader>(
let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
let length = lengths.iter().sum::<usize>();
+ if length == 0 {
+ return Ok(vec![Index::NONE; chunks.len()]);
+ }
+
//read all need data into buffer
- let mut reader = reader.get_read(offset, reader.len() as usize)?;
+ let mut reader = reader.get_read(offset, length)?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;
@@ -64,6 +68,10 @@ pub fn read_pages_locations<R: ChunkReader>(
) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+ if total_length == 0 {
+ return Ok(vec![]);
+ }
+
//read all need data into buffer
let mut reader = reader.get_read(offset, total_length)?;
let mut data = vec![0; total_length];
@@ -82,7 +90,7 @@ pub fn read_pages_locations<R: ChunkReader>(
//Get File offsets of every ColumnChunk's page_index
//If there are invalid offset return a zero offset with empty lengths.
-fn get_index_offset_and_lengths(
+pub(crate) fn get_index_offset_and_lengths(
chunks: &[ColumnChunkMetaData],
) -> Result<(u64, Vec<usize>), ParquetError> {
let first_col_metadata = if let Some(chunk) = chunks.first() {
@@ -111,7 +119,7 @@ fn get_index_offset_and_lengths(
//Get File offset of ColumnChunk's pages_locations
//If there are invalid offset return a zero offset with zero length.
-fn get_location_offset_and_total_length(
+pub(crate) fn get_location_offset_and_total_length(
chunks: &[ColumnChunkMetaData],
) -> Result<(u64, usize), ParquetError> {
let metadata = if let Some(chunk) = chunks.first() {
@@ -133,7 +141,7 @@ fn get_location_offset_and_total_length(
Ok((offset, total_length))
}
-fn deserialize_column_index(
+pub(crate) fn deserialize_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {