You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/18 14:51:53 UTC
[arrow-datafusion] branch master updated: Introduce ObjectStoreProvider to create an object store based on the url (#2906)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 305e26505 Introduce ObjectStoreProvider to create an object store based on the url (#2906)
305e26505 is described below
commit 305e2650522e40a3704ef0e2b4e069b94f1c1e15
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Jul 18 22:51:47 2022 +0800
Introduce ObjectStoreProvider to create an object store based on the url (#2906)
* Introduce ObjectStoreSelfDetector for detector an object store based on the url
* Fix UT
* Fix PR review
* Fix PR review
Co-authored-by: yangzhong <ya...@ebay.com>
---
datafusion/core/src/catalog/schema.rs | 12 +++-
datafusion/core/src/datasource/listing/helpers.rs | 2 +-
datafusion/core/src/datasource/object_store.rs | 73 +++++++++++++++++++----
datafusion/core/src/execution/context.rs | 11 ++--
datafusion/core/src/execution/runtime_env.rs | 14 ++++-
datafusion/core/src/physical_plan/planner.rs | 4 +-
datafusion/core/tests/user_defined_plan.rs | 4 +-
7 files changed, 93 insertions(+), 27 deletions(-)
diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index 7634328f3..b886966bf 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -162,7 +162,17 @@ mod tests {
#[tokio::test]
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
- let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet");
+ let testdir = if testdata.starts_with('/') {
+ format!("file://{}", testdata)
+ } else {
+ format!("file:///{}", testdata)
+ };
+ let filename = if testdir.ends_with('/') {
+ format!("{}{}", testdir, "alltypes_plain.parquet")
+ } else {
+ format!("{}/{}", testdir, "alltypes_plain.parquet")
+ };
+
let table_path = ListingTableUrl::parse(filename).unwrap();
let catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 873d005b4..b4b93d5fc 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -162,7 +162,7 @@ pub fn split_files(
pub async fn pruned_partition_list<'a>(
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
- filters: &[Expr],
+ filters: &'a [Expr],
file_extension: &'a str,
table_partition_cols: &'a [String],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index aca5b0ca4..65f390009 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl {
}
}
+/// Object store provider can detector an object store based on the url
+pub trait ObjectStoreProvider: Send + Sync + 'static {
+ /// Detector a suitable object store based on its url if possible
+ /// Return the key and object store
+ fn get_by_url(&self, url: &Url) -> Option<Arc<dyn ObjectStore>>;
+}
+
/// Object store registry
+#[derive(Clone)]
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
- object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+ object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>,
+ provider: Option<Arc<dyn ObjectStoreProvider>>,
}
impl std::fmt::Debug for ObjectStoreRegistry {
@@ -105,13 +114,19 @@ impl Default for ObjectStoreRegistry {
}
impl ObjectStoreRegistry {
+ /// By default the self detector is None
+ pub fn new() -> Self {
+ ObjectStoreRegistry::new_with_provider(None)
+ }
+
/// Create the registry that object stores can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
- pub fn new() -> Self {
+ pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
- object_stores: RwLock::new(map),
+ object_stores: Arc::new(RwLock::new(map)),
+ provider,
}
}
@@ -132,19 +147,43 @@ impl ObjectStoreRegistry {
///
/// - URL with scheme `file:///` or no schema will return the default LocalFS store
/// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
+ /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered
///
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
- let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
- let stores = self.object_stores.read();
- let store = stores.get(s).ok_or_else(|| {
- DataFusionError::Internal(format!(
- "No suitable object store found for {}",
- url
- ))
- })?;
-
- Ok(store.clone())
+ // First check whether can get object store from registry
+ let store = {
+ let stores = self.object_stores.read();
+ let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
+ stores.get(s).cloned()
+ };
+
+ // If not, then try to detector based on its url.
+ let store = store
+ .or_else(|| {
+ if let Some(provider) = &self.provider {
+ // If detected, register it
+ if let Some(store) = provider.get_by_url(url) {
+ let mut stores = self.object_stores.write();
+ let key =
+ &url[url::Position::BeforeScheme..url::Position::BeforePath];
+ stores.insert(key.to_owned(), store.clone());
+ Some(store)
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ })
+ .ok_or_else(|| {
+ DataFusionError::Internal(format!(
+ "No suitable object store found for {}",
+ url
+ ))
+ })?;
+
+ Ok(store)
}
}
@@ -190,6 +229,14 @@ mod tests {
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}
+ #[test]
+ fn test_get_by_url_hdfs() {
+ let sut = ObjectStoreRegistry::default();
+ sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
+ let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
+ sut.get_by_url(&url).unwrap();
+ }
+
#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5cb45be2f..e186e829a 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -85,7 +85,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
-use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -180,7 +180,7 @@ impl SessionContext {
/// Creates a new session context using the provided session configuration.
pub fn with_config(config: SessionConfig) -> Self {
- let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+ let runtime = Arc::new(RuntimeEnv::default());
Self::with_config_rt(config, runtime)
}
@@ -1211,10 +1211,7 @@ impl Debug for SessionState {
/// Default session builder using the provided configuration
pub fn default_session_builder(config: SessionConfig) -> SessionState {
- SessionState::with_config_rt(
- config,
- Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
- )
+ SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default()))
}
impl SessionState {
@@ -1902,7 +1899,7 @@ mod tests {
#[tokio::test]
async fn custom_query_planner() -> Result<()> {
- let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+ let runtime = Arc::new(RuntimeEnv::default());
let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime)
.with_query_planner(Arc::new(MyQueryPlanner {}));
let ctx = SessionContext::with_state(session_state);
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index d810c882f..36159db8e 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -57,12 +57,13 @@ impl RuntimeEnv {
let RuntimeConfig {
memory_manager,
disk_manager,
+ object_store_registry,
} = config;
Ok(Self {
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
- object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+ object_store_registry: Arc::new(object_store_registry),
})
}
@@ -121,6 +122,8 @@ pub struct RuntimeConfig {
pub disk_manager: DiskManagerConfig,
/// MemoryManager to limit access to memory
pub memory_manager: MemoryManagerConfig,
+ /// ObjectStoreRegistry to get object store based on url
+ pub object_store_registry: ObjectStoreRegistry,
}
impl RuntimeConfig {
@@ -141,6 +144,15 @@ impl RuntimeConfig {
self
}
+ /// Customize object store registry
+ pub fn with_object_store_registry(
+ mut self,
+ object_store_registry: ObjectStoreRegistry,
+ ) -> Self {
+ self.object_store_registry = object_store_registry;
+ self
+ }
+
/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 90ff7a448..46e09ff27 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1582,7 +1582,7 @@ mod tests {
use crate::assert_contains;
use crate::execution::context::TaskContext;
use crate::execution::options::CsvReadOptions;
- use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+ use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Extension;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
@@ -1604,7 +1604,7 @@ mod tests {
use std::{any::Any, fmt};
fn make_session_state() -> SessionState {
- let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+ let runtime = Arc::new(RuntimeEnv::default());
SessionState::with_config_rt(SessionConfig::new(), runtime)
}
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 7e0a7a600..13ddb1eb8 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
use async_trait::async_trait;
use datafusion::execution::context::TaskContext;
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::plan::{Extension, Sort};
use datafusion::logical_plan::{DFSchemaRef, Limit};
use datafusion::optimizer::optimizer::OptimizerConfig;
@@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> {
fn make_topk_context() -> SessionContext {
let config = SessionConfig::new().with_target_partitions(48);
- let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+ let runtime = Arc::new(RuntimeEnv::default());
let state = SessionState::with_config_rt(config, runtime)
.with_query_planner(Arc::new(TopKQueryPlanner {}))
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));