You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/07 14:33:20 UTC
[arrow-datafusion] branch master updated: Replace RwLock and Mutex by using DashMap (#4079)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a9add0e26 Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap (#4079)
a9add0e26 is described below
commit a9add0e267f387559e155e9a477eee2cc99d0f2b
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Nov 7 22:33:15 2022 +0800
Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap (#4079)
* Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap
* Fix Cargo.lock in datafusion-cli
Co-authored-by: yangzhong <ya...@ebay.com>
---
datafusion-cli/Cargo.lock | 14 ++++++++++++
datafusion/core/Cargo.toml | 1 +
datafusion/core/src/catalog/catalog.rs | 29 +++++++++---------------
datafusion/core/src/catalog/schema.rs | 25 +++++++++------------
datafusion/core/src/datasource/listing/table.rs | 30 +++++++++++++------------
datafusion/core/src/datasource/object_store.rs | 30 ++++++++++++-------------
6 files changed, 67 insertions(+), 62 deletions(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index e746bf97b..8d1e53ec2 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -555,6 +555,19 @@ dependencies = [
"syn",
]
+[[package]]
+name = "dashmap"
+version = "5.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc"
+dependencies = [
+ "cfg-if",
+ "hashbrown",
+ "lock_api",
+ "once_cell",
+ "parking_lot_core",
+]
+
[[package]]
name = "datafusion"
version = "14.0.0"
@@ -566,6 +579,7 @@ dependencies = [
"bytes",
"bzip2",
"chrono",
+ "dashmap",
"datafusion-common",
"datafusion-expr",
"datafusion-optimizer",
diff --git a/datafusion/core/Cargo.toml b/datafusion/core/Cargo.toml
index 46a2fb3c4..9ca47864d 100644
--- a/datafusion/core/Cargo.toml
+++ b/datafusion/core/Cargo.toml
@@ -62,6 +62,7 @@ async-trait = "0.1.41"
bytes = "1.1"
bzip2 = "0.4.3"
chrono = { version = "0.4.22", default-features = false }
+dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "14.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "14.0.0" }
datafusion-jit = { path = "../jit", version = "14.0.0", optional = true }
diff --git a/datafusion/core/src/catalog/catalog.rs b/datafusion/core/src/catalog/catalog.rs
index 9a932ee35..ccec8bd25 100644
--- a/datafusion/core/src/catalog/catalog.rs
+++ b/datafusion/core/src/catalog/catalog.rs
@@ -19,10 +19,9 @@
//! representing collections of named schemas.
use crate::catalog::schema::SchemaProvider;
+use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
-use parking_lot::RwLock;
use std::any::Any;
-use std::collections::HashMap;
use std::sync::Arc;
/// Represent a list of named catalogs
@@ -49,14 +48,14 @@ pub trait CatalogList: Sync + Send {
/// Simple in-memory list of catalogs
pub struct MemoryCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
- pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
+ pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}
impl MemoryCatalogList {
/// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs
pub fn new() -> Self {
Self {
- catalogs: RwLock::new(HashMap::new()),
+ catalogs: DashMap::new(),
}
}
}
@@ -77,18 +76,15 @@ impl CatalogList for MemoryCatalogList {
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
- let mut catalogs = self.catalogs.write();
- catalogs.insert(name, catalog)
+ self.catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
- let catalogs = self.catalogs.read();
- catalogs.keys().map(|s| s.to_string()).collect()
+ self.catalogs.iter().map(|c| c.key().clone()).collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
- let catalogs = self.catalogs.read();
- catalogs.get(name).cloned()
+ self.catalogs.get(name).map(|c| c.value().clone())
}
}
@@ -132,14 +128,14 @@ pub trait CatalogProvider: Sync + Send {
/// Simple in-memory implementation of a catalog.
pub struct MemoryCatalogProvider {
- schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
+ schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
impl MemoryCatalogProvider {
/// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
pub fn new() -> Self {
Self {
- schemas: RwLock::new(HashMap::new()),
+ schemas: DashMap::new(),
}
}
}
@@ -150,13 +146,11 @@ impl CatalogProvider for MemoryCatalogProvider {
}
fn schema_names(&self) -> Vec<String> {
- let schemas = self.schemas.read();
- schemas.keys().cloned().collect()
+ self.schemas.iter().map(|s| s.key().clone()).collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
- let schemas = self.schemas.read();
- schemas.get(name).cloned()
+ self.schemas.get(name).map(|s| s.value().clone())
}
fn register_schema(
@@ -164,8 +158,7 @@ impl CatalogProvider for MemoryCatalogProvider {
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
- let mut schemas = self.schemas.write();
- Ok(schemas.insert(name.into(), schema))
+ Ok(self.schemas.insert(name.into(), schema))
}
}
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index b886966bf..df0ef7880 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -18,9 +18,8 @@
//! Describes the interface and built-in implementations of schemas,
//! representing collections of named tables.
-use parking_lot::RwLock;
+use dashmap::DashMap;
use std::any::Any;
-use std::collections::HashMap;
use std::sync::Arc;
use crate::datasource::TableProvider;
@@ -68,14 +67,14 @@ pub trait SchemaProvider: Sync + Send {
/// Simple in-memory implementation of a schema.
pub struct MemorySchemaProvider {
- tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
+ tables: DashMap<String, Arc<dyn TableProvider>>,
}
impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
- tables: RwLock::new(HashMap::new()),
+ tables: DashMap::new(),
}
}
}
@@ -92,13 +91,14 @@ impl SchemaProvider for MemorySchemaProvider {
}
fn table_names(&self) -> Vec<String> {
- let tables = self.tables.read();
- tables.keys().cloned().collect()
+ self.tables
+ .iter()
+ .map(|table| table.key().clone())
+ .collect()
}
fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
- let tables = self.tables.read();
- tables.get(name).cloned()
+ self.tables.get(name).map(|table| table.value().clone())
}
fn register_table(
@@ -112,18 +112,15 @@ impl SchemaProvider for MemorySchemaProvider {
name
)));
}
- let mut tables = self.tables.write();
- Ok(tables.insert(name, table))
+ Ok(self.tables.insert(name, table))
}
fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
- let mut tables = self.tables.write();
- Ok(tables.remove(name))
+ Ok(self.tables.remove(name).map(|(_, table)| table))
}
fn table_exist(&self, name: &str) -> bool {
- let tables = self.tables.read();
- tables.contains_key(name)
+ self.tables.contains_key(name)
}
}
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 61e13abb5..05f560f20 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -17,16 +17,15 @@
//! The table implementation.
-use hashbrown::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};
use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
+use dashmap::DashMap;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
-use parking_lot::RwLock;
use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::{
@@ -266,28 +265,31 @@ impl ListingOptions {
/// Cache is invalided when file size or last modification has changed
#[derive(Default)]
struct StatisticsCache {
- statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
+ statistics: DashMap<Path, (ObjectMeta, Statistics)>,
}
impl StatisticsCache {
/// Get `Statistics` for file location. Returns None if file has changed or not found.
fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
- let map = self.statistics.read();
- let (saved_meta, statistics) = map.get(&meta.location)?;
-
- if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
- {
- // file has changed
- return None;
- }
-
- Some(statistics.clone())
+ self.statistics
+ .get(&meta.location)
+ .map(|s| {
+ let (saved_meta, statistics) = s.value();
+ if saved_meta.size != meta.size
+ || saved_meta.last_modified != meta.last_modified
+ {
+ // file has changed
+ None
+ } else {
+ Some(statistics.clone())
+ }
+ })
+ .unwrap_or(None)
}
/// Save collected file statistics
fn save(&self, meta: ObjectMeta, statistics: Statistics) {
self.statistics
- .write()
.insert(meta.location.clone(), (meta, statistics));
}
}
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index 69b18313c..b6301f96b 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -19,11 +19,10 @@
//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
//! and query data inside these systems.
+use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
-use parking_lot::RwLock;
-use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
@@ -125,7 +124,7 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
- object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+ object_stores: DashMap<String, Arc<dyn ObjectStore>>,
provider: Option<Arc<dyn ObjectStoreProvider>>,
}
@@ -134,7 +133,11 @@ impl std::fmt::Debug for ObjectStoreRegistry {
f.debug_struct("ObjectStoreRegistry")
.field(
"schemes",
- &self.object_stores.read().keys().collect::<Vec<_>>(),
+ &self
+ .object_stores
+ .iter()
+ .map(|o| o.key().clone())
+ .collect::<Vec<_>>(),
)
.finish()
}
@@ -161,10 +164,10 @@ impl ObjectStoreRegistry {
/// may be explicity registered with calls to [`ObjectStoreRegistry::register_store`] or
/// created lazily, on-demand by the provided [`ObjectStoreProvider`]
pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
- let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
- map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
+ let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
+ object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
- object_stores: RwLock::new(map),
+ object_stores,
provider,
}
}
@@ -178,9 +181,8 @@ impl ObjectStoreRegistry {
host: impl AsRef<str>,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
- let mut stores = self.object_stores.write();
let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
- stores.insert(s, store)
+ self.object_stores.insert(s, store)
}
/// Get a suitable store for the provided URL. For example:
@@ -192,21 +194,17 @@ impl ObjectStoreRegistry {
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
// First check whether can get object store from registry
- let store = {
- let stores = self.object_stores.read();
- let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
- stores.get(s).cloned()
- };
+ let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
+ let store = self.object_stores.get(s).map(|o| o.value().clone());
match store {
Some(store) => Ok(store),
None => match &self.provider {
Some(provider) => {
let store = provider.get_by_url(url)?;
- let mut stores = self.object_stores.write();
let key =
&url[url::Position::BeforeScheme..url::Position::BeforePath];
- stores.insert(key.to_owned(), store.clone());
+ self.object_stores.insert(key.to_owned(), store.clone());
Ok(store)
}
None => Err(DataFusionError::Internal(format!(