You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/07 14:33:20 UTC

[arrow-datafusion] branch master updated: Replace RwLock and Mutex by using DashMap (#4079)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new a9add0e26 Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap (#4079)
a9add0e26 is described below

commit a9add0e267f387559e155e9a477eee2cc99d0f2b
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Nov 7 22:33:15 2022 +0800

    Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap (#4079)
    
    * Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap
    
    * Fix Cargo.lock in datafusion-cli
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 datafusion-cli/Cargo.lock                       | 14 ++++++++++++
 datafusion/core/Cargo.toml                      |  1 +
 datafusion/core/src/catalog/catalog.rs          | 29 +++++++++---------------
 datafusion/core/src/catalog/schema.rs           | 25 +++++++++------------
 datafusion/core/src/datasource/listing/table.rs | 30 +++++++++++++------------
 datafusion/core/src/datasource/object_store.rs  | 30 ++++++++++++-------------
 6 files changed, 67 insertions(+), 62 deletions(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index e746bf97b..8d1e53ec2 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -555,6 +555,19 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "dashmap"
+version = "5.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
+dependencies = [
+ "cfg-if",
+ "hashbrown",
+ "lock_api",
+ "once_cell",
+ "parking_lot_core",
+]
+
 [[package]]
 name = "datafusion"
 version = "14.0.0"
@@ -566,6 +579,7 @@ dependencies = [
  "bytes",
  "bzip2",
  "chrono",
+ "dashmap",
  "datafusion-common",
  "datafusion-expr",
  "datafusion-optimizer",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 46a2fb3c4..9ca47864d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -62,6 +62,7 @@ async-trait = "0.1.41"
 bytes = "1.1"
 bzip2 = "0.4.3"
 chrono = { version = "0.4.22", default-features = false }
+dashmap = "5.4.0"
 datafusion-common = { path = "../common", version = "14.0.0", features = ["parquet", "object_store"] }
 datafusion-expr = { path = "../expr", version = "14.0.0" }
 datafusion-jit = { path = "../jit", version = "14.0.0", optional = true }
diff --git a/datafusion/core/src/catalog/catalog.rs b/datafusion/core/src/catalog/catalog.rs
index 9a932ee35..ccec8bd25 100644
--- a/datafusion/core/src/catalog/catalog.rs
+++ b/datafusion/core/src/catalog/catalog.rs
@@ -19,10 +19,9 @@
 //! representing collections of named schemas.
 
 use crate::catalog::schema::SchemaProvider;
+use dashmap::DashMap;
 use datafusion_common::{DataFusionError, Result};
-use parking_lot::RwLock;
 use std::any::Any;
-use std::collections::HashMap;
 use std::sync::Arc;
 
 /// Represent a list of named catalogs
@@ -49,14 +48,14 @@ pub trait CatalogList: Sync + Send {
 /// Simple in-memory list of catalogs
 pub struct MemoryCatalogList {
     /// Collection of catalogs containing schemas and ultimately TableProviders
-    pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
+    pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
 }
 
 impl MemoryCatalogList {
     /// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs
     pub fn new() -> Self {
         Self {
-            catalogs: RwLock::new(HashMap::new()),
+            catalogs: DashMap::new(),
         }
     }
 }
@@ -77,18 +76,15 @@ impl CatalogList for MemoryCatalogList {
         name: String,
         catalog: Arc<dyn CatalogProvider>,
     ) -> Option<Arc<dyn CatalogProvider>> {
-        let mut catalogs = self.catalogs.write();
-        catalogs.insert(name, catalog)
+        self.catalogs.insert(name, catalog)
     }
 
     fn catalog_names(&self) -> Vec<String> {
-        let catalogs = self.catalogs.read();
-        catalogs.keys().map(|s| s.to_string()).collect()
+        self.catalogs.iter().map(|c| c.key().clone()).collect()
     }
 
     fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
-        let catalogs = self.catalogs.read();
-        catalogs.get(name).cloned()
+        self.catalogs.get(name).map(|c| c.value().clone())
     }
 }
 
@@ -132,14 +128,14 @@ pub trait CatalogProvider: Sync + Send {
 
 /// Simple in-memory implementation of a catalog.
 pub struct MemoryCatalogProvider {
-    schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
+    schemas: DashMap<String, Arc<dyn SchemaProvider>>,
 }
 
 impl MemoryCatalogProvider {
     /// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
     pub fn new() -> Self {
         Self {
-            schemas: RwLock::new(HashMap::new()),
+            schemas: DashMap::new(),
         }
     }
 }
@@ -150,13 +146,11 @@ impl CatalogProvider for MemoryCatalogProvider {
     }
 
     fn schema_names(&self) -> Vec<String> {
-        let schemas = self.schemas.read();
-        schemas.keys().cloned().collect()
+        self.schemas.iter().map(|s| s.key().clone()).collect()
     }
 
     fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
-        let schemas = self.schemas.read();
-        schemas.get(name).cloned()
+        self.schemas.get(name).map(|s| s.value().clone())
     }
 
     fn register_schema(
@@ -164,8 +158,7 @@ impl CatalogProvider for MemoryCatalogProvider {
         name: &str,
         schema: Arc<dyn SchemaProvider>,
     ) -> Result<Option<Arc<dyn SchemaProvider>>> {
-        let mut schemas = self.schemas.write();
-        Ok(schemas.insert(name.into(), schema))
+        Ok(self.schemas.insert(name.into(), schema))
     }
 }
 
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index b886966bf..df0ef7880 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -18,9 +18,8 @@
 //! Describes the interface and built-in implementations of schemas,
 //! representing collections of named tables.
 
-use parking_lot::RwLock;
+use dashmap::DashMap;
 use std::any::Any;
-use std::collections::HashMap;
 use std::sync::Arc;
 
 use crate::datasource::TableProvider;
@@ -68,14 +67,14 @@ pub trait SchemaProvider: Sync + Send {
 
 /// Simple in-memory implementation of a schema.
 pub struct MemorySchemaProvider {
-    tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
+    tables: DashMap<String, Arc<dyn TableProvider>>,
 }
 
 impl MemorySchemaProvider {
     /// Instantiates a new MemorySchemaProvider with an empty collection of tables.
     pub fn new() -> Self {
         Self {
-            tables: RwLock::new(HashMap::new()),
+            tables: DashMap::new(),
         }
     }
 }
@@ -92,13 +91,14 @@ impl SchemaProvider for MemorySchemaProvider {
     }
 
     fn table_names(&self) -> Vec<String> {
-        let tables = self.tables.read();
-        tables.keys().cloned().collect()
+        self.tables
+            .iter()
+            .map(|table| table.key().clone())
+            .collect()
     }
 
     fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
-        let tables = self.tables.read();
-        tables.get(name).cloned()
+        self.tables.get(name).map(|table| table.value().clone())
     }
 
     fn register_table(
@@ -112,18 +112,15 @@ impl SchemaProvider for MemorySchemaProvider {
                 name
             )));
         }
-        let mut tables = self.tables.write();
-        Ok(tables.insert(name, table))
+        Ok(self.tables.insert(name, table))
     }
 
     fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
-        let mut tables = self.tables.write();
-        Ok(tables.remove(name))
+        Ok(self.tables.remove(name).map(|(_, table)| table))
     }
 
     fn table_exist(&self, name: &str) -> bool {
-        let tables = self.tables.read();
-        tables.contains_key(name)
+        self.tables.contains_key(name)
     }
 }
 
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 61e13abb5..05f560f20 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,16 +17,15 @@
 
 //! The table implementation.
 
-use hashbrown::HashMap;
 use std::str::FromStr;
 use std::{any::Any, sync::Arc};
 
 use arrow::datatypes::{Field, Schema, SchemaRef};
 use async_trait::async_trait;
+use dashmap::DashMap;
 use futures::{future, stream, StreamExt, TryStreamExt};
 use object_store::path::Path;
 use object_store::ObjectMeta;
-use parking_lot::RwLock;
 
 use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::{
@@ -266,28 +265,31 @@ impl ListingOptions {
 /// Cache is invalided when file size or last modification has changed
 #[derive(Default)]
 struct StatisticsCache {
-    statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
+    statistics: DashMap<Path, (ObjectMeta, Statistics)>,
 }
 
 impl StatisticsCache {
     /// Get `Statistics` for file location. Returns None if file has changed or not found.
     fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
-        let map = self.statistics.read();
-        let (saved_meta, statistics) = map.get(&meta.location)?;
-
-        if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
-        {
-            // file has changed
-            return None;
-        }
-
-        Some(statistics.clone())
+        self.statistics
+            .get(&meta.location)
+            .map(|s| {
+                let (saved_meta, statistics) = s.value();
+                if saved_meta.size != meta.size
+                    || saved_meta.last_modified != meta.last_modified
+                {
+                    // file has changed
+                    None
+                } else {
+                    Some(statistics.clone())
+                }
+            })
+            .unwrap_or(None)
     }
 
     /// Save collected file statistics
     fn save(&self, meta: ObjectMeta, statistics: Statistics) {
         self.statistics
-            .write()
             .insert(meta.location.clone(), (meta, statistics));
     }
 }
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index 69b18313c..b6301f96b 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -19,11 +19,10 @@
 //! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
 //! and query data inside these systems.
 
+use dashmap::DashMap;
 use datafusion_common::{DataFusionError, Result};
 use object_store::local::LocalFileSystem;
 use object_store::ObjectStore;
-use parking_lot::RwLock;
-use std::collections::HashMap;
 use std::sync::Arc;
 use url::Url;
 
@@ -125,7 +124,7 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
 /// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
 pub struct ObjectStoreRegistry {
     /// A map from scheme to object store that serve list / read operations for the store
-    object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+    object_stores: DashMap<String, Arc<dyn ObjectStore>>,
     provider: Option<Arc<dyn ObjectStoreProvider>>,
 }
 
@@ -134,7 +133,11 @@ impl std::fmt::Debug for ObjectStoreRegistry {
         f.debug_struct("ObjectStoreRegistry")
             .field(
                 "schemes",
-                &self.object_stores.read().keys().collect::<Vec<_>>(),
+                &self
+                    .object_stores
+                    .iter()
+                    .map(|o| o.key().clone())
+                    .collect::<Vec<_>>(),
             )
             .finish()
     }
@@ -161,10 +164,10 @@ impl ObjectStoreRegistry {
     /// may be explicity registered with calls to [`ObjectStoreRegistry::register_store`] or
     /// created lazily, on-demand by the provided [`ObjectStoreProvider`]
     pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
-        let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
-        map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
+        let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
+        object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
         Self {
-            object_stores: RwLock::new(map),
+            object_stores,
             provider,
         }
     }
@@ -178,9 +181,8 @@ impl ObjectStoreRegistry {
         host: impl AsRef<str>,
         store: Arc<dyn ObjectStore>,
     ) -> Option<Arc<dyn ObjectStore>> {
-        let mut stores = self.object_stores.write();
         let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
-        stores.insert(s, store)
+        self.object_stores.insert(s, store)
     }
 
     /// Get a suitable store for the provided URL. For example:
@@ -192,21 +194,17 @@ impl ObjectStoreRegistry {
     pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
         let url = url.as_ref();
         // First check whether can get object store from registry
-        let store = {
-            let stores = self.object_stores.read();
-            let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
-            stores.get(s).cloned()
-        };
+        let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
+        let store = self.object_stores.get(s).map(|o| o.value().clone());
 
         match store {
             Some(store) => Ok(store),
             None => match &self.provider {
                 Some(provider) => {
                     let store = provider.get_by_url(url)?;
-                    let mut stores = self.object_stores.write();
                     let key =
                         &url[url::Position::BeforeScheme..url::Position::BeforePath];
-                    stores.insert(key.to_owned(), store.clone());
+                    self.object_stores.insert(key.to_owned(), store.clone());
                     Ok(store)
                 }
                 None => Err(DataFusionError::Internal(format!(