You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/03 13:46:07 UTC
[arrow-datafusion] branch master updated: Cache collected file statistics (#3649)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 85c11c18e Cache collected file statistics (#3649)
85c11c18e is described below
commit 85c11c18e20b41feb4255adc6bafc86423bb7209
Author: mateuszkj <ma...@kondej.net>
AuthorDate: Mon Oct 3 15:46:00 2022 +0200
Cache collected file statistics (#3649)
* Cache collected file statistics
* fix clippy in tests
---
datafusion/core/src/datasource/listing/table.rs | 93 +++++++++++++++++++++++--
1 file changed, 88 insertions(+), 5 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index e77773a82..0f93b610f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,11 +17,15 @@
//! The table implementation.
+use ahash::HashMap;
use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::{future, stream, StreamExt, TryStreamExt};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use parking_lot::RwLock;
use crate::datasource::{
file_format::{
@@ -237,6 +241,36 @@ impl ListingOptions {
}
}
+/// Collected statistics for files
+/// Cache is invalided when file size or last modification has changed
+#[derive(Default)]
+struct StatisticsCache {
+ statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
+}
+
+impl StatisticsCache {
+ /// Get `Statistics` for file location. Returns None if file has changed or not found.
+ fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
+ let map = self.statistics.read();
+ let (saved_meta, statistics) = map.get(&meta.location)?;
+
+ if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
+ {
+ // file has changed
+ return None;
+ }
+
+ Some(statistics.clone())
+ }
+
+ /// Save collected file statistics
+ fn save(&self, meta: ObjectMeta, statistics: Statistics) {
+ self.statistics
+ .write()
+ .insert(meta.location.clone(), (meta, statistics));
+ }
+}
+
/// An implementation of `TableProvider` that uses the object store
/// or file system listing capability to get the list of files.
pub struct ListingTable {
@@ -247,6 +281,7 @@ pub struct ListingTable {
table_schema: SchemaRef,
options: ListingOptions,
definition: Option<String>,
+ collected_statistics: StatisticsCache,
}
impl ListingTable {
@@ -282,6 +317,7 @@ impl ListingTable {
table_schema: Arc::new(Schema::new(table_fields)),
options,
definition: None,
+ collected_statistics: Default::default(),
};
Ok(table)
@@ -400,14 +436,26 @@ impl ListingTable {
let file_list = stream::iter(file_list).flatten();
// collect the statistics if required by the config
- // TODO: Collect statistics and schema in single-pass
let files = file_list.then(|part_file| async {
let part_file = part_file?;
let statistics = if self.options.collect_stat {
- self.options
- .format
- .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
- .await?
+ match self.collected_statistics.get(&part_file.object_meta) {
+ Some(statistics) => statistics,
+ None => {
+ let statistics = self
+ .options
+ .format
+ .infer_stats(
+ &store,
+ self.file_schema.clone(),
+ &part_file.object_meta,
+ )
+ .await?;
+ self.collected_statistics
+ .save(part_file.object_meta.clone(), statistics.clone());
+ statistics
+ }
+ }
} else {
Statistics::default()
};
@@ -434,6 +482,7 @@ mod tests {
test::{columns, object_store::register_test_store},
};
use arrow::datatypes::DataType;
+ use chrono::DateTime;
use super::*;
@@ -752,4 +801,38 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn test_statistics_cache() {
+ let meta = ObjectMeta {
+ location: Path::from("test"),
+ last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
+ .unwrap()
+ .into(),
+ size: 1024,
+ };
+
+ let cache = StatisticsCache::default();
+ assert!(cache.get(&meta).is_none());
+
+ cache.save(meta.clone(), Statistics::default());
+ assert!(cache.get(&meta).is_some());
+
+ // file size changed
+ let mut meta2 = meta.clone();
+ meta2.size = 2048;
+ assert!(cache.get(&meta2).is_none());
+
+ // file last_modified changed
+ let mut meta2 = meta.clone();
+ meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
+ .unwrap()
+ .into();
+ assert!(cache.get(&meta2).is_none());
+
+ // different file
+ let mut meta2 = meta;
+ meta2.location = Path::from("test2");
+ assert!(cache.get(&meta2).is_none());
+ }
}