You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/04 18:23:12 UTC

[arrow-datafusion] branch master updated: Automatically register tables if ObjectStore root is configured (#4095)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a67d0d26 Automatically register tables if ObjectStore root is configured (#4095)
4a67d0d26 is described below

commit 4a67d0d2613d8c4b323e0dd5a6afe1eb1115c7ba
Author: Brent Gardner <br...@spaceandtime.io>
AuthorDate: Fri Nov 4 12:23:07 2022 -0600

    Automatically register tables if ObjectStore root is configured (#4095)
    
    * squash
    
    * Debug test in CI :'(
    
    * Hashing inconsistency
    
    * Address Andy's concerns
    
    * Docs
    
    * Docs
    
    * fmt
    
    * treat empty string like None :(
    
    * clippy
    
    * PR feedback
    
    * Update datafusion/core/src/config.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    * Update datafusion/core/src/config.rs
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 datafusion/core/src/catalog/listing_schema.rs   | 163 ++++++++++++++++++++++++
 datafusion/core/src/catalog/mod.rs              |   1 +
 datafusion/core/src/config.rs                   |  33 +++--
 datafusion/core/src/execution/context.rs        |  99 +++++++++++++-
 datafusion/core/tests/sql/information_schema.rs |   2 +
 docs/source/user-guide/configs.md               |   7 +-
 6 files changed, 291 insertions(+), 14 deletions(-)

diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
new file mode 100644
index 000000000..f4ec85163
--- /dev/null
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -0,0 +1,163 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! listing_schema contains a SchemaProvider that scans ObjectStores for tables automatically
+use crate::catalog::schema::SchemaProvider;
+use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::TableProvider;
+use datafusion_common::DataFusionError;
+use futures::TryStreamExt;
+use object_store::ObjectStore;
+use std::any::Any;
+use std::collections::{HashMap, HashSet};
+use std::path::Path;
+use std::sync::{Arc, Mutex};
+
+/// A `SchemaProvider` that scans an `ObjectStore` to automatically discover tables
+///
+/// A subfolder relationship is assumed, i.e. given:
+/// authority = s3://host.example.com:3000
+/// path = /data/tpch
+/// factory = `DeltaTableFactory`
+///
+/// A table called "customer" will be registered for the folder:
+/// s3://host.example.com:3000/data/tpch/customer
+///
+/// assuming it contains valid deltalake data, i.e:
+/// s3://host.example.com:3000/data/tpch/customer/part-00000-xxxx.snappy.parquet
+/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
+pub struct ListingSchemaProvider {
+    authority: String,
+    path: object_store::path::Path,
+    factory: Arc<dyn TableProviderFactory>,
+    store: Arc<dyn ObjectStore>,
+    tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
+}
+
+impl ListingSchemaProvider {
+    /// Create a new `ListingSchemaProvider`
+    ///
+    /// Arguments:
+    /// `authority`: The scheme (i.e. s3://) + host (i.e. example.com:3000)
+    /// `path`: The root path that contains subfolders which represent tables
+    /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
+    /// `store`: The `ObjectStore` containing the table data
+    pub fn new(
+        authority: String,
+        path: object_store::path::Path,
+        factory: Arc<dyn TableProviderFactory>,
+        store: Arc<dyn ObjectStore>,
+    ) -> Self {
+        Self {
+            authority,
+            path,
+            factory,
+            store,
+            tables: Arc::new(Mutex::new(HashMap::new())),
+        }
+    }
+
+    /// Reload table information from ObjectStore
+    pub async fn refresh(&self) -> datafusion_common::Result<()> {
+        let entries: Vec<_> = self
+            .store
+            .list(Some(&self.path))
+            .await?
+            .try_collect()
+            .await?;
+        let base = Path::new(self.path.as_ref());
+        let mut tables = HashSet::new();
+        for file in entries.iter() {
+            let mut parent = Path::new(file.location.as_ref());
+            while let Some(p) = parent.parent() {
+                if p == base {
+                    tables.insert(parent);
+                }
+                parent = p;
+            }
+        }
+        for table in tables.iter() {
+            let file_name = table
+                .file_name()
+                .ok_or_else(|| {
+                    DataFusionError::Internal("Cannot parse file name!".to_string())
+                })?
+                .to_str()
+                .ok_or_else(|| {
+                    DataFusionError::Internal("Cannot parse file name!".to_string())
+                })?;
+            let table_name = table.to_str().ok_or_else(|| {
+                DataFusionError::Internal("Cannot parse file name!".to_string())
+            })?;
+            if !self.table_exist(file_name) {
+                let table_name = format!("{}/{}", self.authority, table_name);
+                let provider = self.factory.create(table_name.as_str()).await?;
+                let _ = self.register_table(file_name.to_string(), provider.clone())?;
+            }
+        }
+        Ok(())
+    }
+}
+
+impl SchemaProvider for ListingSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.tables
+            .lock()
+            .expect("Can't lock tables")
+            .keys()
+            .map(|it| it.to_string())
+            .collect()
+    }
+
+    fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+        self.tables
+            .lock()
+            .expect("Can't lock tables")
+            .get(name)
+            .cloned()
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
+        self.tables
+            .lock()
+            .expect("Can't lock tables")
+            .insert(name, table.clone());
+        Ok(Some(table))
+    }
+
+    fn deregister_table(
+        &self,
+        name: &str,
+    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
+        Ok(self.tables.lock().expect("Can't lock tables").remove(name))
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.tables
+            .lock()
+            .expect("Can't lock tables")
+            .contains_key(name)
+    }
+}
diff --git a/datafusion/core/src/catalog/mod.rs b/datafusion/core/src/catalog/mod.rs
index 0720f451e..0eabd8698 100644
--- a/datafusion/core/src/catalog/mod.rs
+++ b/datafusion/core/src/catalog/mod.rs
@@ -21,6 +21,7 @@
 #![allow(clippy::module_inception)]
 pub mod catalog;
 pub mod information_schema;
+pub mod listing_schema;
 pub mod schema;
 
 pub use datafusion_sql::{ResolvedTableReference, TableReference};
diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs
index ea9fa765e..8277cef31 100644
--- a/datafusion/core/src/config.rs
+++ b/datafusion/core/src/config.rs
@@ -68,6 +68,12 @@ pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
 /// Configuration option "datafusion.optimizer.max_passes"
 pub const OPT_OPTIMIZER_MAX_PASSES: &str = "datafusion.optimizer.max_passes";
 
+/// Location scanned to load tables for `default` schema
+pub const OPT_CATALOG_LOCATION: &str = "datafusion.catalog.location";
+
+/// Type of `TableProvider` to use when loading `default` schema
+pub const OPT_CATALOG_TYPE: &str = "datafusion.catalog.type";
+
 /// Definition of a configuration option
 pub struct ConfigDefinition {
     /// key used to identifier this configuration option
@@ -144,13 +150,13 @@ impl ConfigDefinition {
     pub fn new_string(
         key: impl Into<String>,
         description: impl Into<String>,
-        default_value: String,
+        default_value: Option<String>,
     ) -> Self {
         Self::new(
             key,
             description,
             DataType::Utf8,
-            ScalarValue::Utf8(Some(default_value)),
+            ScalarValue::Utf8(default_value),
         )
     }
 }
@@ -217,7 +223,7 @@ impl BuiltInConfigs {
                 "The session time zone which some function require \
                 e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
                 then extract the hour.",
-                "UTC".into()
+                Some("UTC".into()),
             ),
             ConfigDefinition::new_bool(
                 OPT_PARQUET_PUSHDOWN_FILTERS,
@@ -245,11 +251,22 @@ impl BuiltInConfigs {
                 rule. When set to false, any rules that produce errors will cause the query to fail.",
                 true
             ),
-             ConfigDefinition::new_u64(
-                 OPT_OPTIMIZER_MAX_PASSES,
-                 "Number of times that the optimizer will attempt to optimize the plan",
-                 3
-             )]
+            ConfigDefinition::new_u64(
+                OPT_OPTIMIZER_MAX_PASSES,
+                "Number of times that the optimizer will attempt to optimize the plan",
+                3
+            ),
+            ConfigDefinition::new_string(
+                OPT_CATALOG_LOCATION,
+                "Location scanned to load tables for `default` schema, defaults to None",
+                None,
+            ),
+            ConfigDefinition::new_string(
+                OPT_CATALOG_TYPE,
+                "Type of `TableProvider` to use when loading `default` schema. Defaults to None",
+                None,
+            ),
+            ]
         }
     }
 
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index d4962f873..0196958d9 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -97,7 +97,10 @@ use datafusion_sql::{
     planner::{ContextProvider, SqlToRel},
 };
 use parquet::file::properties::WriterProperties;
+use url::Url;
 
+use crate::catalog::listing_schema::ListingSchemaProvider;
+use crate::datasource::object_store::ObjectStoreUrl;
 use uuid::Uuid;
 
 use super::options::{
@@ -914,6 +917,11 @@ impl SessionContext {
         state.catalog_list.register_catalog(name, catalog)
     }
 
+    /// Retrieves the list of available catalog names.
+    pub fn catalog_names(&self) -> Vec<String> {
+        self.state.read().catalog_list.catalog_names()
+    }
+
     /// Retrieves a [`CatalogProvider`] instance by name
     pub fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
         self.state.read().catalog_list.catalog(name)
@@ -1258,6 +1266,11 @@ impl SessionConfig {
         self.set(key, ScalarValue::UInt64(Some(value)))
     }
 
+    /// Set a generic `str` configuration option
+    pub fn set_str(self, key: &str, value: &str) -> Self {
+        self.set(key, ScalarValue::Utf8(Some(value.to_string())))
+    }
+
     /// Customize batch size
     pub fn with_batch_size(self, n: usize) -> Self {
         // batch size must be greater than zero
@@ -1508,6 +1521,8 @@ impl SessionState {
                 )
                 .expect("memory catalog provider can register schema");
 
+            Self::register_default_schema(&config, &runtime, &default_catalog);
+
             let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
                 Arc::new(CatalogWithInformationSchema::new(
                     Arc::downgrade(&catalog_list),
@@ -1566,6 +1581,48 @@ impl SessionState {
         }
     }
 
+    fn register_default_schema(
+        config: &SessionConfig,
+        runtime: &Arc<RuntimeEnv>,
+        default_catalog: &MemoryCatalogProvider,
+    ) {
+        let url = config
+            .config_options
+            .read()
+            .get("datafusion.catalog.location");
+        let format = config.config_options.read().get("datafusion.catalog.type");
+        let (url, format) = match (url, format) {
+            (Some(url), Some(format)) => (url, format),
+            _ => return,
+        };
+        if url.is_null() || format.is_null() {
+            return;
+        }
+        let url = url.to_string();
+        let format = format.to_string();
+        let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
+        let authority = match url.host_str() {
+            Some(host) => format!("{}://{}", url.scheme(), host),
+            None => format!("{}://", url.scheme()),
+        };
+        let path = &url.as_str()[authority.len() as usize..];
+        let path = object_store::path::Path::parse(path).expect("Can't parse path");
+        let store = ObjectStoreUrl::parse(authority.as_str())
+            .expect("Invalid default catalog url");
+        let store = match runtime.object_store(store) {
+            Ok(store) => store,
+            _ => return,
+        };
+        let factory = match runtime.table_factories.get(format.as_str()) {
+            Some(factory) => factory,
+            _ => return,
+        };
+        let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
+        let _ = default_catalog
+            .register_schema("default", Arc::new(schema))
+            .expect("Failed to register default schema");
+    }
+
     fn resolve_table_ref<'a>(
         &'a self,
         table_ref: impl Into<TableReference<'a>>,
@@ -1947,10 +2004,12 @@ impl FunctionRegistry for TaskContext {
 mod tests {
     use super::*;
     use crate::assert_batches_eq;
+    use crate::datasource::datasource::TableProviderFactory;
     use crate::execution::context::QueryPlanner;
+    use crate::execution::runtime_env::RuntimeConfig;
     use crate::physical_plan::expressions::AvgAccumulator;
     use crate::test;
-    use crate::test_util::parquet_test_data;
+    use crate::test_util::{parquet_test_data, TestTableFactory};
     use crate::variable::VarType;
     use arrow::array::ArrayRef;
     use arrow::datatypes::*;
@@ -1959,9 +2018,10 @@ mod tests {
     use datafusion_expr::{create_udaf, create_udf, Expr, Volatility};
     use datafusion_physical_expr::functions::make_scalar_function;
     use std::fs::File;
+    use std::path::PathBuf;
     use std::sync::Weak;
     use std::thread::{self, JoinHandle};
-    use std::{io::prelude::*, sync::Mutex};
+    use std::{env, io::prelude::*, sync::Mutex};
     use tempfile::TempDir;
 
     #[tokio::test]
@@ -2199,6 +2259,41 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn with_listing_schema_provider() -> Result<()> {
+        let path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
+        let path = path.join("tests/tpch-csv");
+        let url = format!("file://{}", path.display());
+
+        let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
+            HashMap::new();
+        table_factories.insert("test".to_string(), Arc::new(TestTableFactory {}));
+        let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
+        let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
+        let cfg = SessionConfig::new()
+            .set_str("datafusion.catalog.location", url.as_str())
+            .set_str("datafusion.catalog.type", "test");
+        let session_state = SessionState::with_config_rt(cfg, runtime);
+        let ctx = SessionContext::with_state(session_state);
+
+        let mut table_count = 0;
+        for cat_name in ctx.catalog_names().iter() {
+            let cat = ctx.catalog(cat_name).unwrap();
+            for s_name in cat.schema_names().iter() {
+                let schema = cat.schema(s_name).unwrap();
+                if let Some(listing) =
+                    schema.as_any().downcast_ref::<ListingSchemaProvider>()
+                {
+                    listing.refresh().await.unwrap();
+                    table_count = schema.table_names().len();
+                }
+            }
+        }
+
+        assert_eq!(table_count, 8);
+        Ok(())
+    }
+
     #[tokio::test]
     async fn custom_query_planner() -> Result<()> {
         let runtime = Arc::new(RuntimeEnv::default());
diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs
index 8d0563717..3033f50ed 100644
--- a/datafusion/core/tests/sql/information_schema.rs
+++ b/datafusion/core/tests/sql/information_schema.rs
@@ -698,6 +698,8 @@ async fn show_all() {
         "+-------------------------------------------------+---------+",
         "| name                                            | setting |",
         "+-------------------------------------------------+---------+",
+        "| datafusion.catalog.location                     | NULL    |",
+        "| datafusion.catalog.type                         | NULL    |",
         "| datafusion.execution.batch_size                 | 8192    |",
         "| datafusion.execution.coalesce_batches           | true    |",
         "| datafusion.execution.coalesce_target_batch_size | 4096    |",
diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md
index 2404131da..0e2810ef2 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -37,16 +37,15 @@ Environment variables are read during `SessionConfig` initialisation so they mus
 
 | key                                             | type    | default | description                                                                                                                                                                                                                                                                                                                                                   |
 | ----------------------------------------------- | ------- | ------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
+| datafusion.catalog.location                     | Utf8    | NULL    | Location scanned to load tables for `default` schema                                                                                                                                                                                                                                                                                                          |
+| datafusion.catalog.type                         | Utf8    | NULL    | Type of `TableProvider` to use when loading `default` schema                                                                                                                                                                                                                                                                                                  |
 | datafusion.execution.batch_size                 | UInt64  | 8192    | Default batch size while creating new batches, it's especially useful for buffer-in-memory batches since creating tiny batches would results in too much metadata memory consumption.                                                                                                                                                                         |
 | datafusion.execution.coalesce_batches           | Boolean | true    | When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting 'datafusion.execution.coalesce_target_batch_size'. |
 | datafusion.execution.coalesce_target_batch_size | UInt64  | 4096    | Target batch size when coalescing batches. Uses in conjunction with the configuration setting 'datafusion.execution.coalesce_batches'.                                                                                                                                                                                                                        |
 | datafusion.execution.parquet.enable_page_index  | Boolean | false   | If true, uses parquet data page level metadata (Page Index) statistics to reduce the number of rows decoded.                                                                                                                                                                                                                                                  |
 | datafusion.execution.parquet.pushdown_filters   | Boolean | false   | If true, filter expressions are be applied during the parquet decoding operation to reduce the number of rows decoded.                                                                                                                                                                                                                                        |
 | datafusion.execution.parquet.reorder_filters    | Boolean | false   | If true, filter expressions evaluated during the parquet decoding opearation will be reordered heuristically to minimize the cost of evaluation. If false, the filters are applied in the same order as written in the query.                                                                                                                                 |
-| datafusion.execution.time_zone                  | Utf8    | UTC     | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,                                                                                                                                                                                                                  |
-| then extract the hour.                          |
-| datafusion.execution.time_zone                  | Utf8    | UTC     | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,                                                                                                                                                                                                                  |
-| then extract the hour                           |
+| datafusion.execution.time_zone                  | Utf8    | UTC     | The session time zone which some function require e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone, then extract the hour.                                                                                                                                                                                           |
 | datafusion.explain.logical_plan_only            | Boolean | false   | When set to true, the explain statement will only print logical plans.                                                                                                                                                                                                                                                                                        |
 | datafusion.explain.physical_plan_only           | Boolean | false   | When set to true, the explain statement will only print physical plans.                                                                                                                                                                                                                                                                                       |
 | datafusion.optimizer.filter_null_join_keys      | Boolean | false   | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down.                                                                                               |