You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/17 12:00:43 UTC
[arrow-rs] branch master updated: Prefetch page index (#4090) (#4216)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 8580e858c Prefetch page index (#4090) (#4216)
8580e858c is described below
commit 8580e858c73eab442deb74d194af31385d78c95c
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed May 17 13:00:37 2023 +0100
Prefetch page index (#4090) (#4216)
* Prefetch page index (#4090)
* Clippy
* Docs
* Review feedback
* Tweak docs
---
parquet/src/arrow/async_reader/metadata.rs | 338 ++++++++++++++++++++++++-----
parquet/src/arrow/async_reader/mod.rs | 55 +----
parquet/src/arrow/async_reader/store.rs | 55 +++--
parquet/src/file/footer.rs | 13 +-
parquet/src/file/metadata.rs | 12 +
5 files changed, 340 insertions(+), 133 deletions(-)
diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs
index 7470814fa..076ae5c54 100644
--- a/parquet/src/arrow/async_reader/metadata.rs
+++ b/parquet/src/arrow/async_reader/metadata.rs
@@ -15,13 +15,216 @@
// specific language governing permissions and limitations
// under the License.
+use crate::arrow::async_reader::AsyncFileReader;
use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+ acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
use std::future::Future;
+use std::io::Read;
use std::ops::Range;
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+ self.get_bytes(range)
+ }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+///
+/// Crate-private until stabilised
+pub(crate) struct MetadataLoader<F> {
+ /// Function that fetches byte ranges asynchronously
+ fetch: F,
+ /// The in-progress metadata
+ metadata: ParquetMetaData,
+ /// The offset and bytes of remaining unparsed data
+ remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+ /// Create a new [`MetadataLoader`] by reading the footer information
+ ///
+ /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
+ pub async fn load(
+ mut fetch: F,
+ file_size: usize,
+ prefetch: Option<usize>,
+ ) -> Result<Self> {
+ if file_size < 8 {
+ return Err(ParquetError::EOF(format!(
+ "file size of {file_size} is less than footer"
+ )));
+ }
+
+ // If a size hint is provided, read more than the minimum size
+ // to try and avoid a second fetch.
+ let footer_start = if let Some(size_hint) = prefetch {
+ file_size.saturating_sub(size_hint)
+ } else {
+ file_size - 8
+ };
+
+ let suffix = fetch.fetch(footer_start..file_size).await?;
+ let suffix_len = suffix.len();
+
+ let mut footer = [0; 8];
+ footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+
+ let length = decode_footer(&footer)?;
+
+ if file_size < length + 8 {
+ return Err(ParquetError::EOF(format!(
+ "file size of {} is less than footer + metadata {}",
+ file_size,
+ length + 8
+ )));
+ }
+
+ // Did not fetch the entire file metadata in the initial read, need to make a second request
+ let (metadata, remainder) = if length > suffix_len - 8 {
+ let metadata_start = file_size - length - 8;
+ let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+ let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+ (read_metadata(reader)?, None)
+ } else {
+ let metadata_start = file_size - length - 8 - footer_start;
+
+ let slice = &suffix[metadata_start..suffix_len - 8];
+ (
+ read_metadata(slice)?,
+ Some((footer_start, suffix.slice(..metadata_start))),
+ )
+ };
+
+ Ok(Self {
+ fetch,
+ metadata,
+ remainder,
+ })
+ }
+
+ /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+ pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
+ Self {
+ fetch,
+ metadata,
+ remainder: None,
+ }
+ }
+
+ /// Loads the page index, if any
+ ///
+ /// * `column_index`: if true will load column index
+ /// * `offset_index`: if true will load offset index
+ pub async fn load_page_index(
+ &mut self,
+ column_index: bool,
+ offset_index: bool,
+ ) -> Result<()> {
+ if !column_index && !offset_index {
+ return Ok(());
+ }
+
+ let mut range = None;
+ for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
+ range = acc_range(range, c.column_index_range());
+ range = acc_range(range, c.offset_index_range());
+ }
+ let range = match range {
+ None => return Ok(()),
+ Some(range) => range,
+ };
+
+ let data = match &self.remainder {
+ Some((remainder_start, remainder)) if *remainder_start <= range.start => {
+ let offset = range.start - *remainder_start;
+ remainder.slice(offset..range.end - *remainder_start + offset)
+ }
+ // Note: this will potentially fetch data already in remainder, this keeps things simple
+ _ => self.fetch.fetch(range.start..range.end).await?,
+ };
+
+ // Sanity check
+ assert_eq!(data.len(), range.end - range.start);
+ let offset = range.start;
+
+ if column_index {
+ let index = self
+ .metadata
+ .row_groups()
+ .iter()
+ .map(|x| {
+ x.columns()
+ .iter()
+ .map(|c| match c.column_index_range() {
+ Some(r) => decode_column_index(
+ &data[r.start - offset..r.end - offset],
+ c.column_type(),
+ ),
+ None => Ok(Index::NONE),
+ })
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ self.metadata.set_column_index(Some(index));
+ }
+
+ if offset_index {
+ let index = self
+ .metadata
+ .row_groups()
+ .iter()
+ .map(|x| {
+ x.columns()
+ .iter()
+ .map(|c| match c.offset_index_range() {
+ Some(r) => decode_offset_index(
+ &data[r.start - offset..r.end - offset],
+ ),
+ None => Err(general_err!("missing offset index")),
+ })
+ .collect::<Result<Vec<_>>>()
+ })
+ .collect::<Result<Vec<_>>>()?;
+
+ self.metadata.set_offset_index(Some(index));
+ }
+
+ Ok(())
+ }
+
+ /// Returns the finished [`ParquetMetaData`]
+ pub fn finish(self) -> ParquetMetaData {
+ self.metadata
+ }
+}
+
+struct MetadataFetchFn<F>(F);
+
+impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+where
+ F: FnMut(Range<usize>) -> Fut + Send,
+ Fut: Future<Output = Result<Bytes>> + Send,
+{
+ fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+ async move { self.0(range).await }.boxed()
+ }
+}
+
/// Fetches parquet metadata
///
/// Parameters:
@@ -34,67 +237,22 @@ use std::ops::Range;
/// the last 8 bytes to determine the footer's precise length, before
/// issuing a second request to fetch the metadata bytes
///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
+/// If `prefetch` is `Some`, this will read the specified number of bytes
+/// in the first request, instead of 8, and only issue further requests
+/// if additional bytes are needed. Providing a `prefetch` hint can therefore
+/// significantly reduce the number of `fetch` requests, and consequently latency
pub async fn fetch_parquet_metadata<F, Fut>(
- mut fetch: F,
+ fetch: F,
file_size: usize,
- footer_size_hint: Option<usize>,
+ prefetch: Option<usize>,
) -> Result<ParquetMetaData>
where
- F: FnMut(Range<usize>) -> Fut,
- Fut: Future<Output = Result<Bytes>>,
+ F: FnMut(Range<usize>) -> Fut + Send,
+ Fut: Future<Output = Result<Bytes>> + Send,
{
- if file_size < 8 {
- return Err(ParquetError::EOF(format!(
- "file size of {file_size} is less than footer"
- )));
- }
-
- // If a size hint is provided, read more than the minimum size
- // to try and avoid a second fetch.
- let footer_start = if let Some(size_hint) = footer_size_hint {
- file_size.saturating_sub(size_hint)
- } else {
- file_size - 8
- };
-
- let suffix = fetch(footer_start..file_size).await?;
- let suffix_len = suffix.len();
-
- let mut footer = [0; 8];
- footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
-
- let length = decode_footer(&footer)?;
-
- if file_size < length + 8 {
- return Err(ParquetError::EOF(format!(
- "file size of {} is less than footer + metadata {}",
- file_size,
- length + 8
- )));
- }
-
- // Did not fetch the entire file metadata in the initial read, need to make a second request
- if length > suffix_len - 8 {
- let metadata_start = file_size - length - 8;
- let remaining_metadata = fetch(metadata_start..footer_start).await?;
-
- let mut metadata = BytesMut::with_capacity(length);
-
- metadata.put(remaining_metadata.as_ref());
- metadata.put(&suffix[..suffix_len - 8]);
-
- Ok(decode_metadata(metadata.as_ref())?)
- } else {
- let metadata_start = file_size - length - 8;
-
- Ok(decode_metadata(
- &suffix[metadata_start - footer_start..suffix_len - 8],
- )?)
- }
+ let fetch = MetadataFetchFn(fetch);
+ let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
+ Ok(loader.finish())
}
#[cfg(test)]
@@ -104,6 +262,7 @@ mod tests {
use crate::util::test_common::file_util::get_test_file;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
+ use std::sync::atomic::{AtomicUsize, Ordering};
fn read_range(file: &mut File, range: Range<usize>) -> Result<Bytes> {
file.seek(SeekFrom::Start(range.start as _))?;
@@ -120,28 +279,40 @@ mod tests {
let reader = SerializedFileReader::new(file.try_clone().unwrap()).unwrap();
let expected = reader.metadata().file_metadata().schema();
+ let fetch_count = AtomicUsize::new(0);
+
+ let mut fetch = |range| {
+ fetch_count.fetch_add(1, Ordering::SeqCst);
+ futures::future::ready(read_range(&mut file, range))
+ };
- let mut fetch = |range| futures::future::ready(read_range(&mut file, range));
let actual = fetch_parquet_metadata(&mut fetch, len, None).await.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
// Metadata hint too small
+ fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(10))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
// Metadata hint too large
+ fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(500))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
// Metadata hint exactly correct
+ fetch_count.store(0, Ordering::SeqCst);
let actual = fetch_parquet_metadata(&mut fetch, len, Some(428))
.await
.unwrap();
assert_eq!(actual.file_metadata().schema(), expected);
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
let err = fetch_parquet_metadata(&mut fetch, 4, None)
.await
@@ -155,4 +326,53 @@ mod tests {
.to_string();
assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
}
+
+ #[tokio::test]
+ async fn test_page_index() {
+ let mut file = get_test_file("alltypes_tiny_pages.parquet");
+ let len = file.len() as usize;
+ let fetch_count = AtomicUsize::new(0);
+ let mut fetch = |range| {
+ fetch_count.fetch_add(1, Ordering::SeqCst);
+ futures::future::ready(read_range(&mut file, range))
+ };
+
+ let f = MetadataFetchFn(&mut fetch);
+ let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+ loader.load_page_index(true, true).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
+ let metadata = loader.finish();
+ assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+ // Prefetch just footer exactly
+ fetch_count.store(0, Ordering::SeqCst);
+ let f = MetadataFetchFn(&mut fetch);
+ let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+ loader.load_page_index(true, true).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+ let metadata = loader.finish();
+ assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+ // Prefetch more than footer but not enough
+ fetch_count.store(0, Ordering::SeqCst);
+ let f = MetadataFetchFn(&mut fetch);
+ let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+ loader.load_page_index(true, true).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+ let metadata = loader.finish();
+ assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+ // Prefetch exactly enough
+ fetch_count.store(0, Ordering::SeqCst);
+ let f = MetadataFetchFn(&mut fetch);
+ let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+ loader.load_page_index(true, true).await.unwrap();
+ assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+ let metadata = loader.finish();
+ assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+ }
}
diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs
index 3d4277a83..fb81a2b5d 100644
--- a/parquet/src/arrow/async_reader/mod.rs
+++ b/parquet/src/arrow/async_reader/mod.rs
@@ -107,10 +107,6 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
-use crate::file::page_index::index::Index;
-use crate::file::page_index::index_reader::{
- acc_range, decode_column_index, decode_offset_index,
-};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::PageLocation;
@@ -243,53 +239,10 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
&& metadata.column_index().is_none()
&& metadata.offset_index().is_none()
{
- let fetch = metadata.row_groups().iter().flat_map(|r| r.columns()).fold(
- None,
- |a, c| {
- let a = acc_range(a, c.column_index_range());
- acc_range(a, c.offset_index_range())
- },
- );
-
- if let Some(fetch) = fetch {
- let bytes = input.get_bytes(fetch.clone()).await?;
- let get = |r: Range<usize>| {
- &bytes[(r.start - fetch.start)..(r.end - fetch.start)]
- };
-
- let mut offset_index = Vec::with_capacity(metadata.num_row_groups());
- let mut column_index = Vec::with_capacity(metadata.num_row_groups());
- for rg in metadata.row_groups() {
- let columns = rg.columns();
- let mut rg_offset_index = Vec::with_capacity(columns.len());
- let mut rg_column_index = Vec::with_capacity(columns.len());
-
- for chunk in rg.columns() {
- let t = chunk.column_type();
- let c = match chunk.column_index_range() {
- Some(range) => decode_column_index(get(range), t)?,
- None => Index::NONE,
- };
-
- let o = match chunk.offset_index_range() {
- Some(range) => decode_offset_index(get(range))?,
- None => return Err(general_err!("missing offset index")),
- };
-
- rg_column_index.push(c);
- rg_offset_index.push(o);
- }
- offset_index.push(rg_offset_index);
- column_index.push(rg_column_index);
- }
-
- metadata = Arc::new(ParquetMetaData::new_with_page_index(
- metadata.file_metadata().clone(),
- metadata.row_groups().to_vec(),
- Some(column_index),
- Some(offset_index),
- ));
- }
+ let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone());
+ let mut loader = MetadataLoader::new(&mut input, m);
+ loader.load_page_index(true, true).await?;
+ metadata = Arc::new(loader.finish())
}
Self::new_builder(AsyncReader(input), metadata, options)
diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs
index eb64b11b9..40d982ced 100644
--- a/parquet/src/arrow/async_reader/store.rs
+++ b/parquet/src/arrow/async_reader/store.rs
@@ -24,7 +24,7 @@ use futures::{FutureExt, TryFutureExt};
use object_store::{ObjectMeta, ObjectStore};
-use crate::arrow::async_reader::{fetch_parquet_metadata, AsyncFileReader};
+use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader};
use crate::errors::{ParquetError, Result};
use crate::file::metadata::ParquetMetaData;
@@ -34,6 +34,8 @@ pub struct ParquetObjectReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
metadata_size_hint: Option<usize>,
+ preload_column_index: bool,
+ preload_offset_index: bool,
}
impl ParquetObjectReader {
@@ -45,16 +47,35 @@ impl ParquetObjectReader {
store,
meta,
metadata_size_hint: None,
+ preload_column_index: false,
+ preload_offset_index: false,
}
}
- /// Provide a hint as to the size of the parquet file's footer, see [fetch_parquet_metadata]
+ /// Provide a hint as to the size of the parquet file's footer,
+ /// see [fetch_parquet_metadata](crate::arrow::async_reader::fetch_parquet_metadata)
pub fn with_footer_size_hint(self, hint: usize) -> Self {
Self {
metadata_size_hint: Some(hint),
..self
}
}
+
+ /// Load the Column Index as part of [`Self::get_metadata`]
+ pub fn with_preload_column_index(self, preload_column_index: bool) -> Self {
+ Self {
+ preload_column_index,
+ ..self
+ }
+ }
+
+ /// Load the Offset Index as part of [`Self::get_metadata`]
+ pub fn with_preload_offset_index(self, preload_offset_index: bool) -> Self {
+ Self {
+ preload_offset_index,
+ ..self
+ }
+ }
}
impl AsyncFileReader for ParquetObjectReader {
@@ -89,21 +110,15 @@ impl AsyncFileReader for ParquetObjectReader {
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let metadata = fetch_parquet_metadata(
- |range| {
- self.store
- .get_range(&self.meta.location, range)
- .map_err(|e| {
- ParquetError::General(format!(
- "ParquetObjectReader::get_metadata error: {e}"
- ))
- })
- },
- self.meta.size,
- self.metadata_size_hint,
- )
- .await?;
- Ok(Arc::new(metadata))
+ let preload_column_index = self.preload_column_index;
+ let preload_offset_index = self.preload_offset_index;
+ let file_size = self.meta.size;
+ let prefetch = self.metadata_size_hint;
+ let mut loader = MetadataLoader::load(self, file_size, prefetch).await?;
+ loader
+ .load_page_index(preload_column_index, preload_offset_index)
+ .await?;
+ Ok(Arc::new(loader.finish()))
})
}
}
@@ -150,7 +165,11 @@ mod tests {
Ok(_) => panic!("expected failure"),
Err(e) => {
let err = e.to_string();
- assert!(err.contains("Parquet error: ParquetObjectReader::get_metadata error: Object at location") && err.contains("not found: No such file or directory (os error 2)"), "{}", err);
+ assert!(
+ err.contains("not found: No such file or directory (os error 2)"),
+ "{}",
+ err
+ );
}
}
}
diff --git a/parquet/src/file/footer.rs b/parquet/src/file/footer.rs
index 7cc92afc0..fcd6a300c 100644
--- a/parquet/src/file/footer.rs
+++ b/parquet/src/file/footer.rs
@@ -61,16 +61,19 @@ pub fn parse_metadata<R: ChunkReader>(chunk_reader: &R) -> Result<ParquetMetaDat
));
}
- let metadata =
- chunk_reader.get_bytes(file_size - footer_metadata_len as u64, metadata_len)?;
-
- decode_metadata(&metadata)
+ let start = file_size - footer_metadata_len as u64;
+ read_metadata(chunk_reader.get_read(start)?)
}
/// Decodes [`ParquetMetaData`] from the provided bytes
pub fn decode_metadata(metadata_read: &[u8]) -> Result<ParquetMetaData> {
+ read_metadata(metadata_read)
+}
+
+/// Decodes [`ParquetMetaData`] from the provided [`Read`]
+pub(crate) fn read_metadata<R: Read>(read: R) -> Result<ParquetMetaData> {
// TODO: row group filtering
- let mut prot = TCompactInputProtocol::new(metadata_read);
+ let mut prot = TCompactInputProtocol::new(read);
let t_file_metadata: TFileMetaData = TFileMetaData::read_from_in_protocol(&mut prot)
.map_err(|e| ParquetError::General(format!("Could not parse metadata: {e}")))?;
let schema = types::from_thrift(&t_file_metadata.schema)?;
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 85287c3e0..c2961aa76 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -153,6 +153,18 @@ impl ParquetMetaData {
pub fn offset_index(&self) -> Option<&ParquetOffsetIndex> {
self.offset_index.as_ref()
}
+
+ /// Override the column index
+ #[allow(dead_code)]
+ pub(crate) fn set_column_index(&mut self, index: Option<ParquetColumnIndex>) {
+ self.column_index = index;
+ }
+
+ /// Override the offset index
+ #[allow(dead_code)]
+ pub(crate) fn set_offset_index(&mut self, index: Option<ParquetOffsetIndex>) {
+ self.offset_index = index;
+ }
}
pub type KeyValue = crate::format::KeyValue;