You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/08 09:59:57 UTC

[arrow-datafusion] branch master updated: support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new f9b72f423 support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)
f9b72f423 is described below

commit f9b72f4230687b884a92f79d21762578d3d56281
Author: unconsolable <ch...@gmail.com>
AuthorDate: Sun Jan 8 17:59:51 2023 +0800

    support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)
---
 datafusion-cli/Cargo.lock                |   2 +
 datafusion-cli/Cargo.toml                |   2 +
 datafusion-cli/src/catalog.rs            | 165 +++++++++++++++++++++++++++++++
 datafusion-cli/src/lib.rs                |   1 +
 datafusion-cli/src/main.rs               |   6 ++
 datafusion/core/src/execution/context.rs |  17 +++-
 6 files changed, 192 insertions(+), 1 deletion(-)

diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index f9d3437f3..e4bc1b20d 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -701,12 +701,14 @@ name = "datafusion-cli"
 version = "16.0.0"
 dependencies = [
  "arrow",
+ "async-trait",
  "clap",
  "datafusion",
  "dirs",
  "env_logger",
  "mimalloc",
  "object_store",
+ "parking_lot",
  "rustyline",
  "tokio",
  "url",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index d34e4b1d6..7ad2041e6 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -30,12 +30,14 @@ readme = "README.md"
 
 [dependencies]
 arrow = "29.0.0"
+async-trait = "0.1.41"
 clap = { version = "3", features = ["derive", "cargo"] }
 datafusion = { path = "../datafusion/core", version = "16.0.0" }
 dirs = "4.0.0"
 env_logger = "0.9"
 mimalloc = { version = "0.1", default-features = false }
 object_store = { version = "0.5.0", features = ["aws", "gcp"] }
+parking_lot = { version = "0.12" }
 rustyline = "10.0"
 tokio = { version = "1.23", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
 url = "2.2"
diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
new file mode 100644
index 000000000..dbd6751a4
--- /dev/null
+++ b/datafusion-cli/src/catalog.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use datafusion::catalog::catalog::{CatalogList, CatalogProvider};
+use datafusion::catalog::schema::SchemaProvider;
+use datafusion::datasource::listing::{
+    ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionState;
+use parking_lot::RwLock;
+use std::any::Any;
+use std::sync::{Arc, Weak};
+
+/// Wraps another catalog, automatically creating table providers
+/// for local files if needed
+pub struct DynamicFileCatalog {
+    inner: Arc<dyn CatalogList>,
+    state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileCatalog {
+    pub fn new(inner: Arc<dyn CatalogList>, state: Weak<RwLock<SessionState>>) -> Self {
+        Self { inner, state }
+    }
+}
+
+impl CatalogList for DynamicFileCatalog {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn register_catalog(
+        &self,
+        name: String,
+        catalog: Arc<dyn CatalogProvider>,
+    ) -> Option<Arc<dyn CatalogProvider>> {
+        self.inner.register_catalog(name, catalog)
+    }
+
+    fn catalog_names(&self) -> Vec<String> {
+        self.inner.catalog_names()
+    }
+
+    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+        let state = self.state.clone();
+        self.inner
+            .catalog(name)
+            .map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _)
+    }
+}
+
+/// Wraps another catalog provider
+struct DynamicFileCatalogProvider {
+    inner: Arc<dyn CatalogProvider>,
+    state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileCatalogProvider {
+    pub fn new(
+        inner: Arc<dyn CatalogProvider>,
+        state: Weak<RwLock<SessionState>>,
+    ) -> Self {
+        Self { inner, state }
+    }
+}
+
+impl CatalogProvider for DynamicFileCatalogProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema_names(&self) -> Vec<String> {
+        self.inner.schema_names()
+    }
+
+    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+        let state = self.state.clone();
+        self.inner
+            .schema(name)
+            .map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _)
+    }
+
+    fn register_schema(
+        &self,
+        name: &str,
+        schema: Arc<dyn SchemaProvider>,
+    ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+        self.inner.register_schema(name, schema)
+    }
+}
+
+/// Wraps another schema provider
+struct DynamicFileSchemaProvider {
+    inner: Arc<dyn SchemaProvider>,
+    state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileSchemaProvider {
+    pub fn new(
+        inner: Arc<dyn SchemaProvider>,
+        state: Weak<RwLock<SessionState>>,
+    ) -> Self {
+        Self { inner, state }
+    }
+}
+
+#[async_trait]
+impl SchemaProvider for DynamicFileSchemaProvider {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn table_names(&self) -> Vec<String> {
+        self.inner.table_names()
+    }
+
+    fn register_table(
+        &self,
+        name: String,
+        table: Arc<dyn TableProvider>,
+    ) -> Result<Option<Arc<dyn TableProvider>>> {
+        self.inner.register_table(name, table)
+    }
+
+    async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+        let inner_table = self.inner.table(name).await;
+        if inner_table.is_some() {
+            return inner_table;
+        }
+
+        // if the inner schema provider didn't have a table by
+        // that name, try to treat it as a listing table
+        let state = self.state.upgrade()?.read().clone();
+        let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?)
+            .infer(&state)
+            .await
+            .ok()?;
+        Some(Arc::new(ListingTable::try_new(config).ok()?))
+    }
+
+    fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
+        self.inner.deregister_table(name)
+    }
+
+    fn table_exist(&self, name: &str) -> bool {
+        self.inner.table_exist(name)
+    }
+}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 44d8f0610..7eb3cb51c 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -18,6 +18,7 @@
 #![doc = include_str!("../README.md")]
 pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
 
+pub mod catalog;
 pub mod command;
 pub mod exec;
 pub mod functions;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index b9ada1a1f..fa4adce14 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -21,6 +21,7 @@ use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::prelude::SessionContext;
+use datafusion_cli::catalog::DynamicFileCatalog;
 use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider;
 use datafusion_cli::{
     exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
@@ -106,6 +107,11 @@ pub async fn main() -> Result<()> {
     let mut ctx =
         SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
     ctx.refresh_catalogs().await?;
+    // install dynamic catalog provider that knows how to open files
+    ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
+        ctx.state().catalog_list(),
+        ctx.state_weak_ref(),
+    )));
 
     let mut print_options = PrintOptions {
         format: args.format,
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba8accdce..9d652f811 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,6 @@ use crate::{
 pub use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::var_provider::is_system_variables;
 use parking_lot::RwLock;
-use std::ops::ControlFlow;
 use std::sync::Arc;
 use std::{
     any::{Any, TypeId},
@@ -41,6 +40,7 @@ use std::{
     collections::{HashMap, HashSet},
     fmt::Debug,
 };
+use std::{ops::ControlFlow, sync::Weak};
 
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::record_batch::RecordBatch;
@@ -1009,6 +1009,16 @@ impl SessionContext {
         state.execution_props.start_execution();
         state
     }
+
+    /// Get weak reference to [`SessionState`]
+    pub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>> {
+        Arc::downgrade(&self.state)
+    }
+
+    /// Register [`CatalogList`] in [`SessionState`]
+    pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>) {
+        self.state.write().catalog_list = catalog_list;
+    }
 }
 
 impl FunctionRegistry for SessionContext {
@@ -1788,6 +1798,11 @@ impl SessionState {
     pub fn task_ctx(&self) -> Arc<TaskContext> {
         Arc::new(TaskContext::from(self))
     }
+
+    /// Return catalog list
+    pub fn catalog_list(&self) -> Arc<dyn CatalogList> {
+        self.catalog_list.clone()
+    }
 }
 
 struct SessionContextProvider<'a> {