You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/07/21 10:26:06 UTC
[arrow-datafusion] branch master updated: Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 834924f27 Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)
834924f27 is described below
commit 834924f274d9e04444030b8eb3e07e1035fcd3cf
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Thu Jul 21 06:26:00 2022 -0400
Add metadata_size_hint for optimistic fetching of parquet metadata (#2946)
* Add metadata_size_hint for optimistic fetching of parquet metadata
* Formatting
* Update datafusion/core/src/datasource/file_format/parquet.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* Update datafusion/core/src/datasource/file_format/parquet.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
* PR comments: Guard against size_hint larger than file size and verify request counts in unit test
* Update datafusion/core/src/datasource/file_format/parquet.rs
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
.../core/src/datasource/file_format/parquet.rs | 262 +++++++++++++++++++--
.../core/src/physical_optimizer/repartition.rs | 1 +
.../core/src/physical_plan/file_format/parquet.rs | 37 ++-
3 files changed, 276 insertions(+), 24 deletions(-)
diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs
index 19794f6ca..d23602c4b 100644
--- a/datafusion/core/src/datasource/file_format/parquet.rs
+++ b/datafusion/core/src/datasource/file_format/parquet.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
use arrow::datatypes::Schema;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
+use bytes::{BufMut, BytesMut};
use datafusion_common::DataFusionError;
use hashbrown::HashMap;
use object_store::{ObjectMeta, ObjectStore};
@@ -52,12 +53,14 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet";
#[derive(Debug)]
pub struct ParquetFormat {
enable_pruning: bool,
+ metadata_size_hint: Option<usize>,
}
impl Default for ParquetFormat {
fn default() -> Self {
Self {
enable_pruning: true,
+ metadata_size_hint: None,
}
}
}
@@ -69,10 +72,24 @@ impl ParquetFormat {
self.enable_pruning = enable;
self
}
+
+ /// Provide a hint to the size of the file metadata. If a hint is provided
+ /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically.
+ /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then
+ /// another read to fetch the metadata length encoded in the footer.
+ pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self {
+ self.metadata_size_hint = Some(size_hint);
+ self
+ }
/// Return true if pruning is enabled
pub fn enable_pruning(&self) -> bool {
self.enable_pruning
}
+
+ /// Return the metadata size hint if set
+ pub fn metadata_size_hint(&self) -> Option<usize> {
+ self.metadata_size_hint
+ }
}
#[async_trait]
@@ -88,7 +105,8 @@ impl FileFormat for ParquetFormat {
) -> Result<SchemaRef> {
let mut schemas = Vec::with_capacity(objects.len());
for object in objects {
- let schema = fetch_schema(store.as_ref(), object).await?;
+ let schema =
+ fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?;
schemas.push(schema)
}
let schema = Schema::try_merge(schemas)?;
@@ -101,7 +119,13 @@ impl FileFormat for ParquetFormat {
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics> {
- let stats = fetch_statistics(store.as_ref(), table_schema, object).await?;
+ let stats = fetch_statistics(
+ store.as_ref(),
+ table_schema,
+ object,
+ self.metadata_size_hint,
+ )
+ .await?;
Ok(stats)
}
@@ -119,7 +143,11 @@ impl FileFormat for ParquetFormat {
None
};
- Ok(Arc::new(ParquetExec::new(conf, predicate)))
+ Ok(Arc::new(ParquetExec::new(
+ conf,
+ predicate,
+ self.metadata_size_hint(),
+ )))
}
}
@@ -290,6 +318,7 @@ fn summarize_min_max(
pub(crate) async fn fetch_parquet_metadata(
store: &dyn ObjectStore,
meta: &ObjectMeta,
+ size_hint: Option<usize>,
) -> Result<ParquetMetaData> {
if meta.size < 8 {
return Err(DataFusionError::Execution(format!(
@@ -298,13 +327,22 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}
- let footer_start = meta.size - 8;
+ // If a size hint is provided, read more than the minimum size
+ // to try and avoid a second fetch.
+ let footer_start = if let Some(size_hint) = size_hint {
+ meta.size.saturating_sub(size_hint)
+ } else {
+ meta.size - 8
+ };
+
let suffix = store
.get_range(&meta.location, footer_start..meta.size)
.await?;
+ let suffix_len = suffix.len();
+
let mut footer = [0; 8];
- footer.copy_from_slice(suffix.as_ref());
+ footer.copy_from_slice(&suffix[suffix_len - 8..suffix_len]);
let length = decode_footer(&footer)?;
@@ -316,17 +354,35 @@ pub(crate) async fn fetch_parquet_metadata(
)));
}
- let metadata_start = meta.size - length - 8;
- let metadata = store
- .get_range(&meta.location, metadata_start..footer_start)
- .await?;
+ // Did not fetch the entire file metadata in the initial read, need to make a second request
+ if length > suffix_len - 8 {
+ let metadata_start = meta.size - length - 8;
+ let remaining_metadata = store
+ .get_range(&meta.location, metadata_start..footer_start)
+ .await?;
+
+ let mut metadata = BytesMut::with_capacity(length);
- Ok(decode_metadata(metadata.as_ref())?)
+ metadata.put(remaining_metadata.as_ref());
+ metadata.put(&suffix[..suffix_len - 8]);
+
+ Ok(decode_metadata(metadata.as_ref())?)
+ } else {
+ let metadata_start = meta.size - length - 8;
+
+ Ok(decode_metadata(
+ &suffix[metadata_start - footer_start..suffix_len - 8],
+ )?)
+ }
}
/// Read and parse the schema of the Parquet file at location `path`
-async fn fetch_schema(store: &dyn ObjectStore, file: &ObjectMeta) -> Result<Schema> {
- let metadata = fetch_parquet_metadata(store, file).await?;
+async fn fetch_schema(
+ store: &dyn ObjectStore,
+ file: &ObjectMeta,
+ metadata_size_hint: Option<usize>,
+) -> Result<Schema> {
+ let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let schema = parquet_to_arrow_schema(
file_metadata.schema_descr(),
@@ -340,8 +396,9 @@ async fn fetch_statistics(
store: &dyn ObjectStore,
table_schema: SchemaRef,
file: &ObjectMeta,
+ metadata_size_hint: Option<usize>,
) -> Result<Statistics> {
- let metadata = fetch_parquet_metadata(store, file).await?;
+ let metadata = fetch_parquet_metadata(store, file, metadata_size_hint).await?;
let file_metadata = metadata.file_metadata();
let file_schema = parquet_to_arrow_schema(
@@ -458,6 +515,9 @@ pub(crate) mod test_util {
mod tests {
use super::super::test_util::scan_format;
use crate::physical_plan::collect;
+ use std::fmt::{Display, Formatter};
+ use std::ops::Range;
+ use std::sync::atomic::{AtomicUsize, Ordering};
use super::*;
@@ -469,9 +529,14 @@ mod tests {
StringArray, TimestampNanosecondArray,
};
use arrow::record_batch::RecordBatch;
+ use async_trait::async_trait;
+ use bytes::Bytes;
use datafusion_common::ScalarValue;
+ use futures::stream::BoxStream;
use futures::StreamExt;
use object_store::local::LocalFileSystem;
+ use object_store::path::Path;
+ use object_store::{GetResult, ListResult};
#[tokio::test]
async fn read_merged_batches() -> Result<()> {
@@ -489,7 +554,8 @@ mod tests {
let format = ParquetFormat::default();
let schema = format.infer_schema(&store, &meta).await.unwrap();
- let stats = fetch_statistics(store.as_ref(), schema.clone(), &meta[0]).await?;
+ let stats =
+ fetch_statistics(store.as_ref(), schema.clone(), &meta[0], None).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
@@ -497,7 +563,7 @@ mod tests {
assert_eq!(c1_stats.null_count, Some(1));
assert_eq!(c2_stats.null_count, Some(3));
- let stats = fetch_statistics(store.as_ref(), schema, &meta[1]).await?;
+ let stats = fetch_statistics(store.as_ref(), schema, &meta[1], None).await?;
assert_eq!(stats.num_rows, Some(3));
let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
@@ -509,6 +575,172 @@ mod tests {
Ok(())
}
+ #[derive(Debug)]
+ struct RequestCountingObjectStore {
+ inner: Arc<dyn ObjectStore>,
+ request_count: AtomicUsize,
+ }
+
+ impl Display for RequestCountingObjectStore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(f, "RequestCounting({})", self.inner)
+ }
+ }
+
+ impl RequestCountingObjectStore {
+ pub fn new(inner: Arc<dyn ObjectStore>) -> Self {
+ Self {
+ inner,
+ request_count: Default::default(),
+ }
+ }
+
+ pub fn request_count(&self) -> usize {
+ self.request_count.load(Ordering::SeqCst)
+ }
+
+ pub fn upcast(self: &Arc<Self>) -> Arc<dyn ObjectStore> {
+ self.clone()
+ }
+ }
+
+ #[async_trait]
+ impl ObjectStore for RequestCountingObjectStore {
+ async fn put(&self, _location: &Path, _bytes: Bytes) -> object_store::Result<()> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn get(&self, _location: &Path) -> object_store::Result<GetResult> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn get_range(
+ &self,
+ location: &Path,
+ range: Range<usize>,
+ ) -> object_store::Result<Bytes> {
+ self.request_count.fetch_add(1, Ordering::SeqCst);
+ self.inner.get_range(location, range).await
+ }
+
+ async fn head(&self, _location: &Path) -> object_store::Result<ObjectMeta> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn delete(&self, _location: &Path) -> object_store::Result<()> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn list(
+ &self,
+ _prefix: Option<&Path>,
+ ) -> object_store::Result<BoxStream<'_, object_store::Result<ObjectMeta>>>
+ {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn list_with_delimiter(
+ &self,
+ _prefix: Option<&Path>,
+ ) -> object_store::Result<ListResult> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn copy(&self, _from: &Path, _to: &Path) -> object_store::Result<()> {
+ Err(object_store::Error::NotImplemented)
+ }
+
+ async fn copy_if_not_exists(
+ &self,
+ _from: &Path,
+ _to: &Path,
+ ) -> object_store::Result<()> {
+ Err(object_store::Error::NotImplemented)
+ }
+ }
+
+ #[tokio::test]
+ async fn fetch_metadata_with_size_hint() -> Result<()> {
+ let c1: ArrayRef =
+ Arc::new(StringArray::from(vec![Some("Foo"), None, Some("bar")]));
+
+ let c2: ArrayRef = Arc::new(Int64Array::from(vec![Some(1), Some(2), None]));
+
+ let batch1 = RecordBatch::try_from_iter(vec![("c1", c1.clone())]).unwrap();
+ let batch2 = RecordBatch::try_from_iter(vec![("c2", c2)]).unwrap();
+
+ let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+ LocalFileSystem::new(),
+ )));
+ let (meta, _files) = store_parquet(vec![batch1, batch2]).await?;
+
+ // Use a size hint larger than the parquet footer but smaller than the actual metadata, requiring a second fetch
+ // for the remaining metadata
+ fetch_parquet_metadata(store.as_ref() as &dyn ObjectStore, &meta[0], Some(9))
+ .await
+ .expect("error reading metadata with hint");
+
+ assert_eq!(store.request_count(), 2);
+
+ let format = ParquetFormat::default().with_metadata_size_hint(9);
+ let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
+
+ let stats =
+ fetch_statistics(store.upcast().as_ref(), schema.clone(), &meta[0], Some(9))
+ .await?;
+
+ assert_eq!(stats.num_rows, Some(3));
+ let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+ let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+ assert_eq!(c1_stats.null_count, Some(1));
+ assert_eq!(c2_stats.null_count, Some(3));
+
+ let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+ LocalFileSystem::new(),
+ )));
+
+ // Use the file size as the hint so we can get the full metadata from the first fetch
+ let size_hint = meta[0].size;
+
+ fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
+ .await
+ .expect("error reading metadata with hint");
+
+ // ensure the requests were coalesced into a single request
+ assert_eq!(store.request_count(), 1);
+
+ let format = ParquetFormat::default().with_metadata_size_hint(size_hint);
+ let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap();
+ let stats = fetch_statistics(
+ store.upcast().as_ref(),
+ schema.clone(),
+ &meta[0],
+ Some(size_hint),
+ )
+ .await?;
+
+ assert_eq!(stats.num_rows, Some(3));
+ let c1_stats = &stats.column_statistics.as_ref().expect("missing c1 stats")[0];
+ let c2_stats = &stats.column_statistics.as_ref().expect("missing c2 stats")[1];
+ assert_eq!(c1_stats.null_count, Some(1));
+ assert_eq!(c2_stats.null_count, Some(3));
+
+ let store = Arc::new(RequestCountingObjectStore::new(Arc::new(
+ LocalFileSystem::new(),
+ )));
+
+ // Use the a size hint larger than the file size to make sure we don't panic
+ let size_hint = meta[0].size + 100;
+
+ fetch_parquet_metadata(store.upcast().as_ref(), &meta[0], Some(size_hint))
+ .await
+ .expect("error reading metadata with hint");
+
+ assert_eq!(store.request_count(), 1);
+
+ Ok(())
+ }
+
#[tokio::test]
async fn read_small_batches() -> Result<()> {
let config = SessionConfig::new().with_batch_size(2);
diff --git a/datafusion/core/src/physical_optimizer/repartition.rs b/datafusion/core/src/physical_optimizer/repartition.rs
index e9e14abf6..ec2a1355f 100644
--- a/datafusion/core/src/physical_optimizer/repartition.rs
+++ b/datafusion/core/src/physical_optimizer/repartition.rs
@@ -271,6 +271,7 @@ mod tests {
table_partition_cols: vec![],
},
None,
+ None,
))
}
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index cae92ddbb..b2239876f 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -75,6 +75,8 @@ pub struct ParquetExec {
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for pruning row groups
pruning_predicate: Option<PruningPredicate>,
+ /// Optional hint for the size of the parquet metadata
+ metadata_size_hint: Option<usize>,
}
/// Stores metrics about the parquet execution for a particular parquet file
@@ -90,7 +92,11 @@ struct ParquetFileMetrics {
impl ParquetExec {
/// Create a new Parquet reader execution plan provided file list and schema.
- pub fn new(base_config: FileScanConfig, predicate: Option<Expr>) -> Self {
+ pub fn new(
+ base_config: FileScanConfig,
+ predicate: Option<Expr>,
+ metadata_size_hint: Option<usize>,
+ ) -> Self {
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
base_config.file_groups, base_config.projection, predicate, base_config.limit);
@@ -120,6 +126,7 @@ impl ParquetExec {
projected_statistics,
metrics,
pruning_predicate,
+ metadata_size_hint,
}
}
@@ -212,6 +219,7 @@ impl ExecutionPlan for ParquetExec {
batch_size: context.session_config().batch_size(),
pruning_predicate: self.pruning_predicate.clone(),
table_schema: self.base_config.file_schema.clone(),
+ metadata_size_hint: self.metadata_size_hint,
metrics: self.metrics.clone(),
};
@@ -266,6 +274,7 @@ struct ParquetOpener {
batch_size: usize,
pruning_predicate: Option<PruningPredicate>,
table_schema: SchemaRef,
+ metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
}
@@ -285,6 +294,7 @@ impl FormatReader for ParquetOpener {
let reader = ParquetFileReader {
store,
meta,
+ metadata_size_hint: self.metadata_size_hint,
metrics: metrics.clone(),
};
@@ -331,6 +341,7 @@ impl FormatReader for ParquetOpener {
struct ParquetFileReader {
store: Arc<dyn ObjectStore>,
meta: ObjectMeta,
+ metadata_size_hint: Option<usize>,
metrics: ParquetFileMetrics,
}
@@ -353,14 +364,18 @@ impl AsyncFileReader for ParquetFileReader {
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
- let metadata = fetch_parquet_metadata(self.store.as_ref(), &self.meta)
- .await
- .map_err(|e| {
- ParquetError::General(format!(
- "AsyncChunkReader::get_metadata error: {}",
- e
- ))
- })?;
+ let metadata = fetch_parquet_metadata(
+ self.store.as_ref(),
+ &self.meta,
+ self.metadata_size_hint,
+ )
+ .await
+ .map_err(|e| {
+ ParquetError::General(format!(
+ "AsyncChunkReader::get_metadata error: {}",
+ e
+ ))
+ })?;
Ok(Arc::new(metadata))
})
}
@@ -618,6 +633,7 @@ mod tests {
table_partition_cols: vec![],
},
predicate,
+ None,
);
let session_ctx = SessionContext::new();
@@ -1004,6 +1020,7 @@ mod tests {
table_partition_cols: vec![],
},
None,
+ None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
let results = parquet_exec.execute(0, task_ctx)?.next().await;
@@ -1103,6 +1120,7 @@ mod tests {
],
},
None,
+ None,
);
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);
@@ -1159,6 +1177,7 @@ mod tests {
table_partition_cols: vec![],
},
None,
+ None,
);
let mut results = parquet_exec.execute(0, task_ctx)?;