You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/05/15 13:26:33 UTC

[GitHub] [arrow-rs] tustvold opened a new pull request, #4216: Prefetch page index (#4090)

tustvold opened a new pull request, #4216:
URL: https://github.com/apache/arrow-rs/pull/4216

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes #4090
   
   # Rationale for this change
    
   <!--
   Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
   Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.
   -->
   
   # What changes are included in this PR?
   
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   # Are there any user-facing changes?
   
   This adds `Send` constraints to `fetch_parquet_metadata`, this is unlikely to trip people up in practice
   
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   <!---
   If there are any breaking changes to public APIs, please add the `breaking change` label.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] alamb commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1195683538


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes

Review Comment:
   👍 



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,252 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {

Review Comment:
   I think it is worth a comment in the code about the rationale to keep the struct crate private so future readers know
   
   ```suggestion
   /// 
   /// crate private until the interface is stabalized
   pub(crate) struct MetadataLoader<F> {
   ```



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -155,4 +322,53 @@ mod tests {
             .to_string();
         assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer");
     }
+
+    #[tokio::test]
+    async fn test_page_index() {
+        let mut file = get_test_file("alltypes_tiny_pages.parquet");
+        let len = file.len() as usize;
+        let fetch_count = AtomicUsize::new(0);
+        let mut fetch = |range| {
+            fetch_count.fetch_add(1, Ordering::SeqCst);
+            futures::future::ready(read_range(&mut file, range))
+        };
+
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, None).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 3);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch just footer exactly
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(1729)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch more than footer but not enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(130649)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 2);
+        let metadata = loader.finish();
+        assert!(metadata.offset_index().is_some() && metadata.column_index().is_some());
+
+        // Prefetch exactly enough
+        fetch_count.store(0, Ordering::SeqCst);
+        let f = MetadataFetchFn(&mut fetch);
+        let mut loader = MetadataLoader::load(f, len, Some(130650)).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+        loader.load_page_index(true, true).await.unwrap();
+        assert_eq!(fetch_count.load(Ordering::SeqCst), 1);

Review Comment:
   🎉 



##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;
+
+            let slice = &suffix[metadata_start..suffix_len - 8];
+            (
+                read_metadata(slice)?,
+                Some((footer_start, suffix.slice(..metadata_start))),
+            )
+        };
+
+        Ok(Self {
+            fetch,
+            metadata,
+            remainder,
+        })
+    }
+
+    /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
+    pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
+        Self {
+            fetch,
+            metadata,
+            remainder: None,
+        }
     }
 
-    // Did not fetch the entire file metadata in the initial read, need to make a second request
-    if length > suffix_len - 8 {
-        let metadata_start = file_size - length - 8;
-        let remaining_metadata = fetch(metadata_start..footer_start).await?;
+    /// Loads the page index, if any
+    ///
+    /// * `column_index`: if true will load column index
+    /// * `offset_index`: if true will load offset index
+    pub async fn load_page_index(
+        &mut self,
+        column_index: bool,
+        offset_index: bool,
+    ) -> Result<()> {
+        if !column_index && !offset_index {
+            return Ok(());
+        }
 
-        let mut metadata = BytesMut::with_capacity(length);
+        let mut range = None;
+        for c in self.metadata.row_groups().iter().flat_map(|r| r.columns()) {
+            range = acc_range(range, c.column_index_range());
+            range = acc_range(range, c.offset_index_range());
+        }
+        let range = match range {
+            None => return Ok(()),
+            Some(range) => range,
+        };
 
-        metadata.put(remaining_metadata.as_ref());
-        metadata.put(&suffix[..suffix_len - 8]);
+        let data = match &self.remainder {
+            Some((remainder_start, remainder)) if *remainder_start <= range.start => {
+                let offset = range.start - *remainder_start;
+                remainder.slice(offset..range.end - *remainder_start + offset)
+            }
+            // Note: this will potentially fetch data already in remainder, this keeps things simple
+            _ => self.fetch.fetch(range.start..range.end).await?,
+        };
 
-        Ok(decode_metadata(metadata.as_ref())?)
-    } else {
-        let metadata_start = file_size - length - 8;
+        // Sanity check
+        assert_eq!(data.len(), range.end - range.start);
+        let offset = range.start;
+
+        if column_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.column_index_range() {
+                            Some(r) => decode_column_index(
+                                &data[r.start - offset..r.end - offset],
+                                c.column_type(),
+                            ),
+                            None => Ok(Index::NONE),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_column_index(Some(index));
+        }
+
+        if offset_index {
+            let index = self
+                .metadata
+                .row_groups()
+                .iter()
+                .map(|x| {
+                    x.columns()
+                        .iter()
+                        .map(|c| match c.offset_index_range() {
+                            Some(r) => decode_offset_index(
+                                &data[r.start - offset..r.end - offset],
+                            ),
+                            None => Err(general_err!("missing offset index")),
+                        })
+                        .collect::<Result<Vec<_>>>()
+                })
+                .collect::<Result<Vec<_>>>()?;
+
+            self.metadata.set_offset_index(Some(index));
+        }
+
+        Ok(())
+    }
 
-        Ok(decode_metadata(
-            &suffix[metadata_start - footer_start..suffix_len - 8],
-        )?)
+    /// Returns the finished [`ParquetMetaData`]
+    pub fn finish(self) -> ParquetMetaData {
+        self.metadata
     }
 }
 
+struct MetadataFetchFn<F>(F);
+
+impl<F, Fut> MetadataFetch for MetadataFetchFn<F>
+where
+    F: FnMut(Range<usize>) -> Fut + Send,
+    Fut: Future<Output = Result<Bytes>> + Send,
+{
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        async move { self.0(range).await }.boxed()
+    }
+}
+
+/// Fetches parquet metadata

Review Comment:
   Given this is the public interface, I think we should also have the doc comments on it as well (even if they are redundant with what is on `MetadataFetch`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196013421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   why need minus  `footer_start ` here ? 🤔 Not consist with datafusion https://github.com/apache/arrow-datafusion/blob/6e819d6c2b9280198c67fa16df3e54c79ce46ca2/datafusion/core/src/datasource/file_format/parquet.rs#L420 
   
   If i miss something plz tell me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold merged pull request #4216: Prefetch page index (#4090)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold merged PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#issuecomment-1550602555

   > Thank you @tustvold -- I went through this PR carefully and it looks really nice 👌
   > 
   > I wonder if the same basic pattern could be applied to the Bloom filters as well, or if they suffer from the issue that they don't actually appear in the footer 🤔
   > 
   > cc @thinkharderdev and @Ted-Jiang
   
   Thanks for ping me, i will review this carefully today.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196013421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   why need minus  `footer_start ` here ? 🤔 Not consist with datafusion https://github.com/apache/arrow-datafusion/blob/6e819d6c2b9280198c67fa16df3e54c79ce46ca2/datafusion/core/src/datasource/file_format/parquet.rs#L420 
   
   If i miss something plz tell me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on pull request #4216: Prefetch page index (#4090)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#issuecomment-1551236094

   > I wonder if the same basic pattern could be applied to the Bloom filters as well
   
   Yes, the design of this was with them in mind. Bloom filters can be stored at the end of the file, which would allow prefetching to help, I'm not sure the writer currently does this though


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196013421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   why need minus  `footer_start ` here ? 🤔 Not consist with datafusion https://github.com/apache/arrow-datafusion/blob/6e819d6c2b9280198c67fa16df3e54c79ce46ca2/datafusion/core/src/datasource/file_format/parquet.rs#L420 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196023174


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   Oh, It is total different logic 🤣  my mistake!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "Ted-Jiang (via GitHub)" <gi...@apache.org>.
Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196013421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   why need minus  `footer_start ` 🤔 . Not consist with datafusion https://github.com/apache/arrow-datafusion/blob/6e819d6c2b9280198c67fa16df3e54c79ce46ca2/datafusion/core/src/datasource/file_format/parquet.rs#L420



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-rs] tustvold commented on a diff in pull request #4216: Prefetch page index (#4090)

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1193836421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,252 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {

Review Comment:
   I opted to keep this crate private for now as I'm not totally happy with the interface



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org