You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "alamb (via GitHub)" <gi...@apache.org> on 2023/09/15 17:30:43 UTC

[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #7570: [feat] Introduce cacheManager in session ctx and make StatisticsCache share in session

alamb commented on code in PR #7570:
URL: https://github.com/apache/arrow-datafusion/pull/7570#discussion_r1327552035


##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod cache_manager;
+pub mod cache_unit;
+
+// The cache accessor, users usually working on this interface while manipulating caches
+pub trait CacheAccessor<K, V>: Send + Sync {
+    // Extra info but not part of the cache key or cache value.
+    type Extra: Clone;
+
+    /// Get value from cache.
+    fn get(&self, k: &K) -> Option<V>;
+    /// Get value from cache.
+    fn get_with_extra(&self, k: &K, e: &Self::Extra) -> Option<V>;
+    /// Put value into cache. Returns the old value associated with the key if there was one.
+    fn put(&self, key: &K, value: V) -> Option<V>;
+    /// Put value into cache. Returns the old value associated with the key if there was one.
+    fn put_with_extra(&self, key: &K, value: V, e: &Self::Extra) -> Option<V>;
+    /// Remove an entry from the cache, returning `true` if they existed in the cache.
+    fn evict(&self, k: &K) -> bool;

Review Comment:
   What do you think about following the convention of HashMap and other Rust stdlib containers and returning the previous value, if any
   
   Something like
   
   ```rust
       fn remove(&mut self, k: &K) -> Option<V>;
   
   ```
   
   



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -616,7 +582,7 @@ pub struct ListingTable {
     table_schema: SchemaRef,
     options: ListingOptions,
     definition: Option<String>,
-    collected_statistics: StatisticsCache,
+    collected_statistics: Option<FileStaticCache>,

Review Comment:
   As written, I think this may regress performance for some users unexpectedly. Specifically, by default there will be no caching within the execution of a single query (which I think was the original usecase for this feature)
   
   Thus I recommend this be
   
   ```suggestion
       collected_statistics: FileStaticCache,
   ```
   
   And then rather `ListingTable::try_new_with_cache` add a function like
   
   ```rust
   impl ListingTable {
       pub fn with_cache(mut self, cache: FileStaticCache) {
           self.collected_statistics = cache;
           self
   }
   ```



##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod cache_manager;
+pub mod cache_unit;
+
+// The cache accessor, users usually working on this interface while manipulating caches
+pub trait CacheAccessor<K, V>: Send + Sync {
+    // Extra info but not part of the cache key or cache value.
+    type Extra: Clone;

Review Comment:
   Can you explain what the usecase for `Extra` is? Specifically I wonder why such information could not be added as a field to the `Value`



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;

Review Comment:
   I think there is a typo in this name: `FileStaticCache` should be `FileStatisticsCache
   
   Perhaps this can be



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;

Review Comment:
   I think it would be nice:
   1. Derive `Debug` for this
   2. Add some doc strings explaining that this structure is attached to the RuntimeEnv and can be used to cache values for longer durations for a query



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;
+
+#[derive(Default)]
+pub struct CacheManager {
+    file_statistic_cache: Option<FileStaticCache>,
+}
+
+impl CacheManager {
+    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
+        let mut manager = CacheManager::default();
+        if let Some(cc) = &config.table_files_statistics_cache {
+            manager.file_statistic_cache = Some(cc.clone())
+        }
+        Ok(Arc::new(manager))
+    }
+
+    pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {

Review Comment:
   I think we should have some docstrings here



##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod cache_manager;
+pub mod cache_unit;
+
+// The cache accessor, users usually working on this interface while manipulating caches
+pub trait CacheAccessor<K, V>: Send + Sync {

Review Comment:
   Another classic API to add here would be "clear()" to clear all the values



##########
datafusion/core/src/datasource/listing_table_factory.rs:
##########
@@ -229,8 +229,14 @@ impl TableProviderFactory for ListingTableFactory {
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
             .with_schema(resolved_schema);
-        let table =
-            ListingTable::try_new(config)?.with_definition(cmd.definition.clone());
+        let provider;

Review Comment:
   If you implement the `with_cache` API, this can look like
   
   ```rust
   let provider = ListingTable::try_new(config)?
     .with_cache(state.runtime_env().cache_manager.get_file_statistic_cache())
   ```



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -1092,6 +1092,95 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn load_table_stats_with_session_level_cache() -> Result<()> {
+        let testdata = crate::test_util::parquet_test_data();

Review Comment:
   Nice -- thank you. Since this is an end to end test, I recommend moving it to somewhere in `core_integration`:  datafusion/core/tests/core_integration.rs perhaps



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;
+
+#[derive(Default)]
+pub struct CacheManager {
+    file_statistic_cache: Option<FileStaticCache>,
+}
+
+impl CacheManager {
+    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
+        let mut manager = CacheManager::default();
+        if let Some(cc) = &config.table_files_statistics_cache {
+            manager.file_statistic_cache = Some(cc.clone())
+        }
+        Ok(Arc::new(manager))
+    }
+
+    pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {
+        self.file_statistic_cache.clone()
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct CacheManagerConfig {
+    /// Enable cache of files statistics when listing files.
+    /// Avoid get same file statistics repeatedly in same datafusion session.
+    /// Default is disable. Fow now only supports Parquet files.
+    pub table_files_statistics_cache: Option<FileStaticCache>,
+}
+
+impl CacheManagerConfig {
+    pub fn enable_table_files_statistics_cache(mut self, cache: FileStaticCache) -> Self {

Review Comment:
   Can we possibly have the field names match -- here it is called `table_files_statistics_cache` but on the `CacheManager` it is called `file_statistics_cache` -- I think they should be the same in both places (I like `file_statistics_cache` best as it matches the type name)



##########
datafusion/execution/src/cache/cache_unit.rs:
##########
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use dashmap::DashMap;
+use datafusion_common::Statistics;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+
+/// Collected statistics for files
+/// Cache is invalided when file size or last modification has changed
+#[derive(Default)]
+pub struct FileStatisticsCache {
+    statistics: DashMap<Path, (ObjectMeta, Statistics)>,
+}
+
+impl CacheAccessor<Path, Statistics> for FileStatisticsCache {
+    type Extra = ObjectMeta;
+
+    /// Get `Statistics` for file location.
+    fn get(&self, k: &Path) -> Option<Statistics> {
+        self.statistics
+            .get(k)
+            .map(|s| Some(s.value().1.clone()))
+            .unwrap_or(None)
+    }
+
+    /// Get `Statistics` for file location. Returns None if file has changed or not found.
+    fn get_with_extra(&self, k: &Path, e: &Self::Extra) -> Option<Statistics> {

Review Comment:
   as written this is going to copy the statistics (though I realize that is what this PR did previously) -- maybe we could use something like `Arc<Statistics>` to store the statistics.



##########
datafusion/execution/src/cache/cache_manager.rs:
##########
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::cache::CacheAccessor;
+use datafusion_common::{Result, Statistics};
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use std::sync::Arc;
+
+pub type FileStaticCache = Arc<dyn CacheAccessor<Path, Statistics, Extra = ObjectMeta>>;
+
+#[derive(Default)]
+pub struct CacheManager {
+    file_statistic_cache: Option<FileStaticCache>,
+}
+
+impl CacheManager {
+    pub fn try_new(config: &CacheManagerConfig) -> Result<Arc<Self>> {
+        let mut manager = CacheManager::default();
+        if let Some(cc) = &config.table_files_statistics_cache {
+            manager.file_statistic_cache = Some(cc.clone())
+        }
+        Ok(Arc::new(manager))
+    }
+
+    pub fn get_file_statistic_cache(&self) -> Option<FileStaticCache> {
+        self.file_statistic_cache.clone()
+    }
+}
+
+#[derive(Clone, Default)]
+pub struct CacheManagerConfig {
+    /// Enable cache of files statistics when listing files.
+    /// Avoid get same file statistics repeatedly in same datafusion session.
+    /// Default is disable. Fow now only supports Parquet files.
+    pub table_files_statistics_cache: Option<FileStaticCache>,
+}
+
+impl CacheManagerConfig {
+    pub fn enable_table_files_statistics_cache(mut self, cache: FileStaticCache) -> Self {

Review Comment:
   I agree -- this looks good to me
   
   Maybe we could make this follow a more builder style API as elsewhere in DataFusion:
   
   ```rust
   
    pub fn with_table_files_statistics_cache(mut self, cache: Option<FileStaticCache>) -> Self {
   ```
   
   
   



##########
datafusion/execution/src/cache/mod.rs:
##########
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod cache_manager;
+pub mod cache_unit;
+
+// The cache accessor, users usually working on this interface while manipulating caches

Review Comment:
   ```suggestion
   // The cache accessor, users usually working on this interface while manipulating caches. 
   /// This interface does not get `mut` references and thus has to handle its own
   /// locking via internal mutability. It can be accessed via multiple concurrent queries
   /// during planning and execution.
   
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org