You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2022/07/18 14:51:53 UTC

[arrow-datafusion] branch master updated: Introduce ObjectStoreProvider to create an object store based on the url (#2906)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 305e26505 Introduce ObjectStoreProvider to create an object store based on the url (#2906)
305e26505 is described below

commit 305e2650522e40a3704ef0e2b4e069b94f1c1e15
Author: yahoNanJing <90...@users.noreply.github.com>
AuthorDate: Mon Jul 18 22:51:47 2022 +0800

    Introduce ObjectStoreProvider to create an object store based on the url (#2906)
    
    * Introduce ObjectStoreSelfDetector for detector an object store based on the url
    
    * Fix UT
    
    * Fix PR review
    
    * Fix PR review
    
    Co-authored-by: yangzhong <ya...@ebay.com>
---
 datafusion/core/src/catalog/schema.rs             | 12 +++-
 datafusion/core/src/datasource/listing/helpers.rs |  2 +-
 datafusion/core/src/datasource/object_store.rs    | 73 +++++++++++++++++++----
 datafusion/core/src/execution/context.rs          | 11 ++--
 datafusion/core/src/execution/runtime_env.rs      | 14 ++++-
 datafusion/core/src/physical_plan/planner.rs      |  4 +-
 datafusion/core/tests/user_defined_plan.rs        |  4 +-
 7 files changed, 93 insertions(+), 27 deletions(-)

diff --git a/datafusion/core/src/catalog/schema.rs b/datafusion/core/src/catalog/schema.rs
index 7634328f3..b886966bf 100644
--- a/datafusion/core/src/catalog/schema.rs
+++ b/datafusion/core/src/catalog/schema.rs
@@ -162,7 +162,17 @@ mod tests {
     #[tokio::test]
     async fn test_schema_register_listing_table() {
         let testdata = crate::test_util::parquet_test_data();
-        let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet");
+        let testdir = if testdata.starts_with('/') {
+            format!("file://{}", testdata)
+        } else {
+            format!("file:///{}", testdata)
+        };
+        let filename = if testdir.ends_with('/') {
+            format!("{}{}", testdir, "alltypes_plain.parquet")
+        } else {
+            format!("{}/{}", testdir, "alltypes_plain.parquet")
+        };
+
         let table_path = ListingTableUrl::parse(filename).unwrap();
 
         let catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 873d005b4..b4b93d5fc 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -162,7 +162,7 @@ pub fn split_files(
 pub async fn pruned_partition_list<'a>(
     store: &'a dyn ObjectStore,
     table_path: &'a ListingTableUrl,
-    filters: &[Expr],
+    filters: &'a [Expr],
     file_extension: &'a str,
     table_partition_cols: &'a [String],
 ) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
diff --git a/datafusion/core/src/datasource/object_store.rs b/datafusion/core/src/datasource/object_store.rs
index aca5b0ca4..65f390009 100644
--- a/datafusion/core/src/datasource/object_store.rs
+++ b/datafusion/core/src/datasource/object_store.rs
@@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl {
     }
 }
 
+/// Object store provider can detector an object store based on the url
+pub trait ObjectStoreProvider: Send + Sync + 'static {
+    /// Detector a suitable object store based on its url if possible
+    /// Return the key and object store
+    fn get_by_url(&self, url: &Url) -> Option<Arc<dyn ObjectStore>>;
+}
+
 /// Object store registry
+#[derive(Clone)]
 pub struct ObjectStoreRegistry {
     /// A map from scheme to object store that serve list / read operations for the store
-    object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
+    object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>,
+    provider: Option<Arc<dyn ObjectStoreProvider>>,
 }
 
 impl std::fmt::Debug for ObjectStoreRegistry {
@@ -105,13 +114,19 @@ impl Default for ObjectStoreRegistry {
 }
 
 impl ObjectStoreRegistry {
+    /// By default the self detector is None
+    pub fn new() -> Self {
+        ObjectStoreRegistry::new_with_provider(None)
+    }
+
     /// Create the registry that object stores can registered into.
     /// ['LocalFileSystem'] store is registered in by default to support read local files natively.
-    pub fn new() -> Self {
+    pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
         let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
         map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
         Self {
-            object_stores: RwLock::new(map),
+            object_stores: Arc::new(RwLock::new(map)),
+            provider,
         }
     }
 
@@ -132,19 +147,43 @@ impl ObjectStoreRegistry {
     ///
     /// - URL with scheme `file:///` or no schema will return the default LocalFS store
     /// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
+    /// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered
     ///
     pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
         let url = url.as_ref();
-        let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
-        let stores = self.object_stores.read();
-        let store = stores.get(s).ok_or_else(|| {
-            DataFusionError::Internal(format!(
-                "No suitable object store found for {}",
-                url
-            ))
-        })?;
-
-        Ok(store.clone())
+        // First check whether can get object store from registry
+        let store = {
+            let stores = self.object_stores.read();
+            let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
+            stores.get(s).cloned()
+        };
+
+        // If not, then try to detector based on its url.
+        let store = store
+            .or_else(|| {
+                if let Some(provider) = &self.provider {
+                    // If detected, register it
+                    if let Some(store) = provider.get_by_url(url) {
+                        let mut stores = self.object_stores.write();
+                        let key =
+                            &url[url::Position::BeforeScheme..url::Position::BeforePath];
+                        stores.insert(key.to_owned(), store.clone());
+                        Some(store)
+                    } else {
+                        None
+                    }
+                } else {
+                    None
+                }
+            })
+            .ok_or_else(|| {
+                DataFusionError::Internal(format!(
+                    "No suitable object store found for {}",
+                    url
+                ))
+            })?;
+
+        Ok(store)
     }
 }
 
@@ -190,6 +229,14 @@ mod tests {
         assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
     }
 
+    #[test]
+    fn test_get_by_url_hdfs() {
+        let sut = ObjectStoreRegistry::default();
+        sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
+        let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
+        sut.get_by_url(&url).unwrap();
+    }
+
     #[test]
     fn test_get_by_url_s3() {
         let sut = ObjectStoreRegistry::default();
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 5cb45be2f..e186e829a 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -85,7 +85,7 @@ use crate::config::{
     ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
     OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
 };
-use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use crate::execution::runtime_env::RuntimeEnv;
 use crate::logical_plan::plan::Explain;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
 use crate::physical_plan::planner::DefaultPhysicalPlanner;
@@ -180,7 +180,7 @@ impl SessionContext {
 
     /// Creates a new session context using the provided session configuration.
     pub fn with_config(config: SessionConfig) -> Self {
-        let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+        let runtime = Arc::new(RuntimeEnv::default());
         Self::with_config_rt(config, runtime)
     }
 
@@ -1211,10 +1211,7 @@ impl Debug for SessionState {
 
 /// Default session builder using the provided configuration
 pub fn default_session_builder(config: SessionConfig) -> SessionState {
-    SessionState::with_config_rt(
-        config,
-        Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
-    )
+    SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default()))
 }
 
 impl SessionState {
@@ -1902,7 +1899,7 @@ mod tests {
 
     #[tokio::test]
     async fn custom_query_planner() -> Result<()> {
-        let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+        let runtime = Arc::new(RuntimeEnv::default());
         let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime)
             .with_query_planner(Arc::new(MyQueryPlanner {}));
         let ctx = SessionContext::with_state(session_state);
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index d810c882f..36159db8e 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -57,12 +57,13 @@ impl RuntimeEnv {
         let RuntimeConfig {
             memory_manager,
             disk_manager,
+            object_store_registry,
         } = config;
 
         Ok(Self {
             memory_manager: MemoryManager::new(memory_manager),
             disk_manager: DiskManager::try_new(disk_manager)?,
-            object_store_registry: Arc::new(ObjectStoreRegistry::new()),
+            object_store_registry: Arc::new(object_store_registry),
         })
     }
 
@@ -121,6 +122,8 @@ pub struct RuntimeConfig {
     pub disk_manager: DiskManagerConfig,
     /// MemoryManager to limit access to memory
     pub memory_manager: MemoryManagerConfig,
+    /// ObjectStoreRegistry to get object store based on url
+    pub object_store_registry: ObjectStoreRegistry,
 }
 
 impl RuntimeConfig {
@@ -141,6 +144,15 @@ impl RuntimeConfig {
         self
     }
 
+    /// Customize object store registry
+    pub fn with_object_store_registry(
+        mut self,
+        object_store_registry: ObjectStoreRegistry,
+    ) -> Self {
+        self.object_store_registry = object_store_registry;
+        self
+    }
+
     /// Specify the total memory to use while running the DataFusion
     /// plan to `max_memory * memory_fraction` in bytes.
     ///
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 90ff7a448..46e09ff27 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1582,7 +1582,7 @@ mod tests {
     use crate::assert_contains;
     use crate::execution::context::TaskContext;
     use crate::execution::options::CsvReadOptions;
-    use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+    use crate::execution::runtime_env::RuntimeEnv;
     use crate::logical_plan::plan::Extension;
     use crate::physical_plan::{
         expressions, DisplayFormatType, Partitioning, Statistics,
@@ -1604,7 +1604,7 @@ mod tests {
     use std::{any::Any, fmt};
 
     fn make_session_state() -> SessionState {
-        let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+        let runtime = Arc::new(RuntimeEnv::default());
         SessionState::with_config_rt(SessionConfig::new(), runtime)
     }
 
diff --git a/datafusion/core/tests/user_defined_plan.rs b/datafusion/core/tests/user_defined_plan.rs
index 7e0a7a600..13ddb1eb8 100644
--- a/datafusion/core/tests/user_defined_plan.rs
+++ b/datafusion/core/tests/user_defined_plan.rs
@@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};
 
 use async_trait::async_trait;
 use datafusion::execution::context::TaskContext;
-use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
+use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::logical_plan::plan::{Extension, Sort};
 use datafusion::logical_plan::{DFSchemaRef, Limit};
 use datafusion::optimizer::optimizer::OptimizerConfig;
@@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> {
 
 fn make_topk_context() -> SessionContext {
     let config = SessionConfig::new().with_target_partitions(48);
-    let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
+    let runtime = Arc::new(RuntimeEnv::default());
     let state = SessionState::with_config_rt(config, runtime)
         .with_query_planner(Arc::new(TopKQueryPlanner {}))
         .add_optimizer_rule(Arc::new(TopKOptimizerRule {}));