You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/10/03 13:46:07 UTC

[arrow-datafusion] branch master updated: Cache collected file statistics (#3649)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 85c11c18e Cache collected file statistics (#3649)
85c11c18e is described below

commit 85c11c18e20b41feb4255adc6bafc86423bb7209
Author: mateuszkj <ma...@kondej.net>
AuthorDate: Mon Oct 3 15:46:00 2022 +0200

    Cache collected file statistics (#3649)
    
    * Cache collected file statistics
    
    * fix clippy in tests
---
 datafusion/core/src/datasource/listing/table.rs | 93 +++++++++++++++++++++++--
 1 file changed, 88 insertions(+), 5 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index e77773a82..0f93b610f 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,11 +17,15 @@
 
 //! The table implementation.
 
+use ahash::HashMap;
 use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use async_trait::async_trait;
 use futures::{future, stream, StreamExt, TryStreamExt};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use parking_lot::RwLock;
 
 use crate::datasource::{
     file_format::{
@@ -237,6 +241,36 @@ impl ListingOptions {
     }
 }
 
+/// Collected statistics for files
+/// Cache is invalided when file size or last modification has changed
+#[derive(Default)]
+struct StatisticsCache {
+    statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
+}
+
+impl StatisticsCache {
+    /// Get `Statistics` for file location. Returns None if file has changed or not found.
+    fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
+        let map = self.statistics.read();
+        let (saved_meta, statistics) = map.get(&meta.location)?;
+
+        if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
+        {
+            // file has changed
+            return None;
+        }
+
+        Some(statistics.clone())
+    }
+
+    /// Save collected file statistics
+    fn save(&self, meta: ObjectMeta, statistics: Statistics) {
+        self.statistics
+            .write()
+            .insert(meta.location.clone(), (meta, statistics));
+    }
+}
+
 /// An implementation of `TableProvider` that uses the object store
 /// or file system listing capability to get the list of files.
 pub struct ListingTable {
@@ -247,6 +281,7 @@ pub struct ListingTable {
     table_schema: SchemaRef,
     options: ListingOptions,
     definition: Option<String>,
+    collected_statistics: StatisticsCache,
 }
 
 impl ListingTable {
@@ -282,6 +317,7 @@ impl ListingTable {
             table_schema: Arc::new(Schema::new(table_fields)),
             options,
             definition: None,
+            collected_statistics: Default::default(),
         };
 
         Ok(table)
@@ -400,14 +436,26 @@ impl ListingTable {
         let file_list = stream::iter(file_list).flatten();
 
         // collect the statistics if required by the config
-        // TODO: Collect statistics and schema in single-pass
         let files = file_list.then(|part_file| async {
             let part_file = part_file?;
             let statistics = if self.options.collect_stat {
-                self.options
-                    .format
-                    .infer_stats(&store, self.file_schema.clone(), &part_file.object_meta)
-                    .await?
+                match self.collected_statistics.get(&part_file.object_meta) {
+                    Some(statistics) => statistics,
+                    None => {
+                        let statistics = self
+                            .options
+                            .format
+                            .infer_stats(
+                                &store,
+                                self.file_schema.clone(),
+                                &part_file.object_meta,
+                            )
+                            .await?;
+                        self.collected_statistics
+                            .save(part_file.object_meta.clone(), statistics.clone());
+                        statistics
+                    }
+                }
             } else {
                 Statistics::default()
             };
@@ -434,6 +482,7 @@ mod tests {
         test::{columns, object_store::register_test_store},
     };
     use arrow::datatypes::DataType;
+    use chrono::DateTime;
 
     use super::*;
 
@@ -752,4 +801,38 @@ mod tests {
 
         Ok(())
     }
+
+    #[test]
+    fn test_statistics_cache() {
+        let meta = ObjectMeta {
+            location: Path::from("test"),
+            last_modified: DateTime::parse_from_rfc3339("2022-09-27T22:36:00+02:00")
+                .unwrap()
+                .into(),
+            size: 1024,
+        };
+
+        let cache = StatisticsCache::default();
+        assert!(cache.get(&meta).is_none());
+
+        cache.save(meta.clone(), Statistics::default());
+        assert!(cache.get(&meta).is_some());
+
+        // file size changed
+        let mut meta2 = meta.clone();
+        meta2.size = 2048;
+        assert!(cache.get(&meta2).is_none());
+
+        // file last_modified changed
+        let mut meta2 = meta.clone();
+        meta2.last_modified = DateTime::parse_from_rfc3339("2022-09-27T22:40:00+02:00")
+            .unwrap()
+            .into();
+        assert!(cache.get(&meta2).is_none());
+
+        // different file
+        let mut meta2 = meta;
+        meta2.location = Path::from("test2");
+        assert!(cache.get(&meta2).is_none());
+    }
 }