You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "Ted-Jiang (via GitHub)" <gi...@apache.org> on 2023/05/17 06:41:09 UTC

[GitHub] [arrow-rs] Ted-Jiang commented on a diff in pull request #4216: Prefetch page index (#4090)

Ted-Jiang commented on code in PR #4216:
URL: https://github.com/apache/arrow-rs/pull/4216#discussion_r1196013421


##########
parquet/src/arrow/async_reader/metadata.rs:
##########
@@ -15,95 +15,250 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::arrow::async_reader::AsyncFileReader;
 use crate::errors::{ParquetError, Result};
-use crate::file::footer::{decode_footer, decode_metadata};
+use crate::file::footer::{decode_footer, read_metadata};
 use crate::file::metadata::ParquetMetaData;
-use bytes::{BufMut, Bytes, BytesMut};
+use crate::file::page_index::index::Index;
+use crate::file::page_index::index_reader::{
+    acc_range, decode_column_index, decode_offset_index,
+};
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::FutureExt;
 use std::future::Future;
+use std::io::Read;
 use std::ops::Range;
 
-/// Fetches parquet metadata
-///
-/// Parameters:
-/// * fetch: an async function that can fetch byte ranges
-/// * file_size: the total size of the parquet file
-/// * footer_size_hint: footer prefetch size (see comments below)
-///
-/// The length of the parquet footer, which contains file metadata, is not
-/// known up front. Therefore this function will first issue a request to read
-/// the last 8 bytes to determine the footer's precise length, before
-/// issuing a second request to fetch the metadata bytes
-///
-/// If a hint is set, this method will read the specified number of bytes
-/// in the first request, instead of 8, and only issue a second request
-/// if additional bytes are needed. This can therefore eliminate a
-/// potentially costly additional fetch operation
-pub async fn fetch_parquet_metadata<F, Fut>(
-    mut fetch: F,
-    file_size: usize,
-    footer_size_hint: Option<usize>,
-) -> Result<ParquetMetaData>
-where
-    F: FnMut(Range<usize>) -> Fut,
-    Fut: Future<Output = Result<Bytes>>,
-{
-    if file_size < 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {file_size} is less than footer"
-        )));
+/// A data source that can be used with [`MetadataLoader`] to load [`ParquetMetaData`]
+pub(crate) trait MetadataFetch {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>>;
+}
+
+impl<'a, T: AsyncFileReader> MetadataFetch for &'a mut T {
+    fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, Result<Bytes>> {
+        self.get_bytes(range)
     }
+}
+
+/// An asynchronous interface to load [`ParquetMetaData`] from an async source
+pub(crate) struct MetadataLoader<F> {
+    /// Function that fetches byte ranges asynchronously
+    fetch: F,
+    /// The in-progress metadata
+    metadata: ParquetMetaData,
+    /// The offset and bytes of remaining unparsed data
+    remainder: Option<(usize, Bytes)>,
+}
+
+impl<F: MetadataFetch> MetadataLoader<F> {
+    /// Create a new [`MetadataLoader`] by reading the footer information
+    ///
+    /// Parameters:
+    /// * fetch: an async function that can fetch byte ranges
+    /// * file_size: the total size of the parquet file
+    /// * footer_size_hint: footer prefetch size (see comments below)
+    ///
+    /// The length of the parquet footer, which contains file metadata, is not
+    /// known up front. Therefore this function will first issue a request to read
+    /// the last 8 bytes to determine the footer's precise length, before
+    /// issuing a second request to fetch the metadata bytes
+    ///
+    /// If a `prefetch` is `Some`, this will read the specified number of bytes
+    /// in the first request, instead of 8, and only issue further requests
+    /// if additional bytes are needed. Providing a hint can therefore significantly
+    /// reduce the number of `fetch` requests, and consequently latency
+    pub async fn load(
+        mut fetch: F,
+        file_size: usize,
+        prefetch: Option<usize>,
+    ) -> Result<Self> {
+        if file_size < 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {file_size} is less than footer"
+            )));
+        }
 
-    // If a size hint is provided, read more than the minimum size
-    // to try and avoid a second fetch.
-    let footer_start = if let Some(size_hint) = footer_size_hint {
-        file_size.saturating_sub(size_hint)
-    } else {
-        file_size - 8
-    };
+        // If a size hint is provided, read more than the minimum size
+        // to try and avoid a second fetch.
+        let footer_start = if let Some(size_hint) = prefetch {
+            file_size.saturating_sub(size_hint)
+        } else {
+            file_size - 8
+        };
 
-    let suffix = fetch(footer_start..file_size).await?;
-    let suffix_len = suffix.len();
+        let suffix = fetch.fetch(footer_start..file_size).await?;
+        let suffix_len = suffix.len();
 
-    let mut footer = [0; 8];
-    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
+        let mut footer = [0; 8];
+        footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
-    let length = decode_footer(&footer)?;
+        let length = decode_footer(&footer)?;
 
-    if file_size < length + 8 {
-        return Err(ParquetError::EOF(format!(
-            "file size of {} is less than footer + metadata {}",
-            file_size,
-            length + 8
-        )));
+        if file_size < length + 8 {
+            return Err(ParquetError::EOF(format!(
+                "file size of {} is less than footer + metadata {}",
+                file_size,
+                length + 8
+            )));
+        }
+
+        // Did not fetch the entire file metadata in the initial read, need to make a second request
+        let (metadata, remainder) = if length > suffix_len - 8 {
+            let metadata_start = file_size - length - 8;
+            let remaining_metadata = fetch.fetch(metadata_start..footer_start).await?;
+
+            let reader = remaining_metadata.as_ref().chain(&suffix[..suffix_len - 8]);
+            (read_metadata(reader)?, None)
+        } else {
+            let metadata_start = file_size - length - 8 - footer_start;

Review Comment:
   why need minus  `footer_start ` 🤔 . Not consist with datafusion https://github.com/apache/arrow-datafusion/blob/6e819d6c2b9280198c67fa16df3e54c79ce46ca2/datafusion/core/src/datasource/file_format/parquet.rs#L420



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org