You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/21 10:26:06 UTC

[arrow-datafusion] branch master updated: Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 834924f27 Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)
834924f27 is described below

commit 834924f274d9e04444030b8eb3e07e1035fcd3cf
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Jul 21 06:26:00 2022 -0400

    Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)
    
    * Add metadata_size_hint for optimistic fetching of parquet metadata
    
    * Formatting
    
    * Update datafusion/core/src/datasource/file_format/parquet.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update datafusion/core/src/datasource/file_format/parquet.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * PR comments: Guard against size_hint larger than file size and verify request counts in unit test
    
    * Update datafusion/core/src/datasource/file_format/parquet.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 .../core/src/datasource/file_format/parquet.rs     | 262 +++++++++++++++++++--
 .../core/src/physical_optimizer/repartition.rs     |   1 +
 .../core/src/physical_plan/file_format/parquet.rs  |  37 ++-
 3 files changed, 276 insertions(+), 24 deletions(-)

diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 19794f6ca..d23602c4b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 use arrow::datatypes::Schema;
 use arrow::datatypes::SchemaRef;
 use async_trait::async_trait;
+use bytes::{BufMut, BytesMut};
 use datafusion_common::DataFusionError;
 use hashbrown::HashMap;
 use object_store::{ObjectMeta, ObjectStore};
@@ -52,12 +53,14 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
 #[derive(Debug)]
 pub struct ParquetFormat {
     enable_pruning: bool,
+    metadata_size_hint: Option<usize>,
 }
 
 impl Default for ParquetFormat {
     fn default() -> Self {
         Self {
             enable_pruning: true,
+            metadata_size_hint: None,
         }
     }
 }
@@ -69,10 +72,24 @@ impl ParquetFormat {
         self.enable_pruning = enable;
         self
     }
+
+    /// Provide a hint to the size of the file metadata. If a hint is provided
+    /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
+    /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then
+    /// another read to fetch the metadata length encoded in the footer.
+    pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self {
+        self.metadata_size_hint = Some(size_hint);
+        self
+    }
     /// Return true if pruning is enabled
     pub fn enable_pruning(&self) -> bool {
         self.enable_pruning
     }
+
+    /// Return the metadata size hint if set
+    pub fn metadata_size_hint(&self) -> Option<usize> {
+        self.metadata_size_hint
+    }
 }
 
 #[async_trait]
@@ -88,7 +105,8 @@ impl FileFormat for ParquetFormat {
     ) -> Result<SchemaRef> {
         let mut schemas = Vec::with_capacity(objects.len());
         for object in objects {
-            let schema = fetch_schema(store.as_ref(), object).await?;
+            let schema =
+                fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
             schemas.push(schema)
         }
         let schema = Schema::try_merge(schemas)?;
@@ -101,7 +119,13 @@ impl FileFormat for ParquetFormat {
         table_schema: SchemaRef,
         object: &ObjectMeta,
     ) -> Result<Statistics> {
-        let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
+        let stats = fetch_statistics(
+            store.as_ref(),
+            table_schema,
+            object,
+            self.metadata_size_hint,
+        )
+        .await?;
         Ok(stats)
     }
 
@@ -119,7 +143,11 @@ impl FileFormat for ParquetFormat {
             None
         };
 
-        Ok(Arc::new(ParquetExec::new(conf, predicate)))
+        Ok(Arc::new(ParquetExec::new(
+            conf,
+            predicate,
+            self.metadata_size_hint(),
+        )))
     }
 }
 
@@ -290,6 +318,7 @@ fn summarize_min_max(
 pub(crate) async fn fetch_parquet_metadata(
     store: &dyn ObjectStore,
     meta: &ObjectMeta,
+    size_hint: Option<usize>,
 ) -> Result<ParquetMetaData> {
     if meta.size < 8 {
         return Err(DataFusionError::Execution(format!(
@@ -298,13 +327,22 @@ pub(crate) async fn fetch_parquet_metadata(
         )));
     }
 
-    let footer_start = meta.size - 8;
+    // If a size hint is provided, read more than the minimum size
+    // to try and avoid a second fetch.
+    let footer_start = if let Some(size_hint) = size_hint {
+        meta.size.saturating_sub(size_hint)
+    } else {
+        meta.size - 8
+    };
+
     let suffix = store
         .get_range(&meta.location, footer_start..meta.size)
         .await?;
 
+    let suffix_len = suffix.len();
+
     let mut footer = [0; 8];
-    footer.copy_from_slice(suffix.as_ref());
+    footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
 
     let length = decode_footer(&footer)?;
 
@@ -316,17 +354,35 @@ pub(crate) async fn fetch_parquet_metadata(
         )));
     }
 
-    let metadata_start = meta.size - length - 8;
-    let metadata = store
-        .get_range(&meta.location, metadata_start..footer_start)
-        .await?;
+    // Did not fetch the entire file metadata in the initial read, need to make a second request
+    if length > suffix_len - 8 {
+        let metadata_start = meta.size - length - 8;
+        let remaining_metadata = store
+            .get_range(&meta.location, metadata_start..footer_start)
+            .await?;
+
+        let mut metadata = BytesMut::with_capacity(length);
 
-    Ok(decode_metadata(metadata.as_ref())?)
+        metadata.put(remaining_metadata.as_ref());
+        metadata.put(&suffix[..suffix_len - 8]);
+
+        Ok(decode_metadata(metadata.as_ref())?)
+    } else {
+        let metadata_start = meta.size - length - 8;
+
+        Ok(decode_metadata(
+            &suffix[metadata_start - footer_start..suffix_len - 8],
+        )?)
+    }
 }
 
 /// Read and parse the schema of the Parquet file at location `path`
-async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
-    let metadata = fetch_parquet_metadata(store, file).await?;
+async fn fetch_schema(
+    store: &dyn ObjectStore,
+    file: &ObjectMeta,
+    metadata_size_hint: Option<usize>,
+) -> Result<Schema> {
+    let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
     let file_metadata = metadata.file_metadata();
     let schema = parquet_to_arrow_schema(
         file_metadata.schema_descr(),
@@ -340,8 +396,9 @@ async fn fetch_statistics(
     store: &dyn ObjectStore,
     table_schema: SchemaRef,
     file: &ObjectMeta,
+    metadata_size_hint: Option<usize>,
 ) -> Result<Statistics> {
-    let metadata = fetch_parquet_metadata(store, file).await?;
+    let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
     let file_metadata = metadata.file_metadata();
 
     let file_schema = parquet_to_arrow_schema(
@@ -458,6 +515,9 @@ pub(crate) mod test_util {
 mod tests {
     use super::super::test_util::scan_format;
     use crate::physical_plan::collect;
+    use std::fmt::{Display, Formatter};
+    use std::ops::Range;
+    use std::sync::atomic::{AtomicUsize, Ordering};
 
     use super::*;
 
@@ -469,9 +529,14 @@ mod tests {
         StringArray, TimestampNanosecondArray,
     };
     use arrow::record_batch::RecordBatch;
+    use async_trait::async_trait;
+    use bytes::Bytes;
     use datafusion_common::ScalarValue;
+    use futures::stream::BoxStream;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
+    use object_store::{GetResult, ListResult};
 
     #[tokio::test]
     async fn read_merged_batches() -> Result<()> {
@@ -489,7 +554,8 @@ mod tests {
         let format = ParquetFormat::default();
         let schema = format.infer_schema(&store, &meta).await.unwrap();
 
-        let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
+        let stats =
+            fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;
 
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -497,7 +563,7 @@ mod tests {
         assert_eq!(c1_stats.null_count, Some(1));
         assert_eq!(c2_stats.null_count, Some(3));
 
-        let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
+        let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?;
         assert_eq!(stats.num_rows, Some(3));
         let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
         let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -509,6 +575,172 @@ mod tests {
         Ok(())
     }
 
+    #[derive(Debug)]
+    struct RequestCountingObjectStore {
+        inner: Arc<dyn ObjectStore>,
+        request_count: AtomicUsize,
+    }
+
+    impl Display for RequestCountingObjectStore {
+        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+            write!(f, "RequestCounting({})", self.inner)
+        }
+    }
+
+    impl RequestCountingObjectStore {
+        pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
+            Self {
+                inner,
+                request_count: Default::default(),
+            }
+        }
+
+        pub fn request_count(&self) -> usize {
+            self.request_count.load(Ordering::SeqCst)
+        }
+
+        pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
+            self.clone()
+        }
+    }
+
+    #[async_trait]
+    impl ObjectStore for RequestCountingObjectStore {
+        async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn get_range(
+            &self,
+            location: &Path,
+            range: Range<usize>,
+        ) -> object_store::Result<Bytes> {
+            self.request_count.fetch_add(1, Ordering::SeqCst);
+            self.inner.get_range(location, range).await
+        }
+
+        async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn delete(&self, _location: &Path) -> object_store::Result<()> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn list(
+            &self,
+            _prefix: Option<&Path>,
+        ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
+        {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn list_with_delimiter(
+            &self,
+            _prefix: Option<&Path>,
+        ) -> object_store::Result<ListResult> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
+            Err(object_store::Error::NotImplemented)
+        }
+
+        async fn copy_if_not_exists(
+            &self,
+            _from: &Path,
+            _to: &Path,
+        ) -> object_store::Result<()> {
+            Err(object_store::Error::NotImplemented)
+        }
+    }
+
+    #[tokio::test]
+    async fn fetch_metadata_with_size_hint() -> Result<()> {
+        let c1: ArrayRef =
+            Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+        let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+        let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
+        let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
+
+        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+            LocalFileSystem::new(),
+        )));
+        let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+
+        // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
+        // for the remaining metadata
+        fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
+            .await
+            .expect("error reading metadata with hint");
+
+        assert_eq!(store.request_count(), 2);
+
+        let format = ParquetFormat::default().with_metadata_size_hint(9);
+        let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
+
+        let stats =
+            fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
+                .await?;
+
+        assert_eq!(stats.num_rows, Some(3));
+        let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+        let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+        assert_eq!(c1_stats.null_count, Some(1));
+        assert_eq!(c2_stats.null_count, Some(3));
+
+        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+            LocalFileSystem::new(),
+        )));
+
+        // Use the file size as the hint so we can get the full metadata from the first fetch
+        let size_hint = meta[0].size;
+
+        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
+            .await
+            .expect("error reading metadata with hint");
+
+        // ensure the requests were coalesced into a single request
+        assert_eq!(store.request_count(), 1);
+
+        let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
+        let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
+        let stats = fetch_statistics(
+            store.upcast().as_ref(),
+            schema.clone(),
+            &meta[0],
+            Some(size_hint),
+        )
+        .await?;
+
+        assert_eq!(stats.num_rows, Some(3));
+        let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+        let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+        assert_eq!(c1_stats.null_count, Some(1));
+        assert_eq!(c2_stats.null_count, Some(3));
+
+        let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+            LocalFileSystem::new(),
+        )));
+
+        // Use the a size hint larger than the file size to make sure we don't panic
+        let size_hint = meta[0].size + 100;
+
+        fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
+            .await
+            .expect("error reading metadata with hint");
+
+        assert_eq!(store.request_count(), 1);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn read_small_batches() -> Result<()> {
         let config = SessionConfig::new().with_batch_size(2);
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index e9e14abf6..ec2a1355f 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -271,6 +271,7 @@ mod tests {
                 table_partition_cols: vec![],
             },
             None,
+            None,
         ))
     }
 
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index cae92ddbb..b2239876f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -75,6 +75,8 @@ pub struct ParquetExec {
     metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for pruning row groups
     pruning_predicate: Option<PruningPredicate>,
+    /// Optional hint for the size of the parquet metadata
+    metadata_size_hint: Option<usize>,
 }
 
 /// Stores metrics about the parquet execution for a particular parquet file
@@ -90,7 +92,11 @@ struct ParquetFileMetrics {
 
 impl ParquetExec {
     /// Create a new Parquet reader execution plan provided file list and schema.
-    pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
+    pub fn new(
+        base_config: FileScanConfig,
+        predicate: Option<Expr>,
+        metadata_size_hint: Option<usize>,
+    ) -> Self {
         debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
         base_config.file_groups, base_config.projection, predicate, base_config.limit);
 
@@ -120,6 +126,7 @@ impl ParquetExec {
             projected_statistics,
             metrics,
             pruning_predicate,
+            metadata_size_hint,
         }
     }
 
@@ -212,6 +219,7 @@ impl ExecutionPlan for ParquetExec {
             batch_size: context.session_config().batch_size(),
             pruning_predicate: self.pruning_predicate.clone(),
             table_schema: self.base_config.file_schema.clone(),
+            metadata_size_hint: self.metadata_size_hint,
             metrics: self.metrics.clone(),
         };
 
@@ -266,6 +274,7 @@ struct ParquetOpener {
     batch_size: usize,
     pruning_predicate: Option<PruningPredicate>,
     table_schema: SchemaRef,
+    metadata_size_hint: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
 }
 
@@ -285,6 +294,7 @@ impl FormatReader for ParquetOpener {
         let reader = ParquetFileReader {
             store,
             meta,
+            metadata_size_hint: self.metadata_size_hint,
             metrics: metrics.clone(),
         };
 
@@ -331,6 +341,7 @@ impl FormatReader for ParquetOpener {
 struct ParquetFileReader {
     store: Arc<dyn ObjectStore>,
     meta: ObjectMeta,
+    metadata_size_hint: Option<usize>,
     metrics: ParquetFileMetrics,
 }
 
@@ -353,14 +364,18 @@ impl AsyncFileReader for ParquetFileReader {
         &mut self,
     ) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
         Box::pin(async move {
-            let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta)
-                .await
-                .map_err(|e| {
-                    ParquetError::General(format!(
-                        "AsyncChunkReader::get_metadata error: {}",
-                        e
-                    ))
-                })?;
+            let metadata = fetch_parquet_metadata(
+                self.store.as_ref(),
+                &self.meta,
+                self.metadata_size_hint,
+            )
+            .await
+            .map_err(|e| {
+                ParquetError::General(format!(
+                    "AsyncChunkReader::get_metadata error: {}",
+                    e
+                ))
+            })?;
             Ok(Arc::new(metadata))
         })
     }
@@ -618,6 +633,7 @@ mod tests {
                 table_partition_cols: vec![],
             },
             predicate,
+            None,
         );
 
         let session_ctx = SessionContext::new();
@@ -1004,6 +1020,7 @@ mod tests {
                     table_partition_cols: vec![],
                 },
                 None,
+                None,
             );
             assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
             let results = parquet_exec.execute(0, task_ctx)?.next().await;
@@ -1103,6 +1120,7 @@ mod tests {
                 ],
             },
             None,
+            None,
         );
         assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
 
@@ -1159,6 +1177,7 @@ mod tests {
                 table_partition_cols: vec![],
             },
             None,
+            None,
         );
 
         let mut results = parquet_exec.execute(0, task_ctx)?;