You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/01/08 09:59:57 UTC
[arrow-datafusion] branch master updated: support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new f9b72f423 support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)
f9b72f423 is described below
commit f9b72f4230687b884a92f79d21762578d3d56281
Author: unconsolable <ch...@gmail.com>
AuthorDate: Sun Jan 8 17:59:51 2023 +0800
support `select .. FROM 'parquet.file'` in datafusion-cli (#4838)
---
datafusion-cli/Cargo.lock | 2 +
datafusion-cli/Cargo.toml | 2 +
datafusion-cli/src/catalog.rs | 165 +++++++++++++++++++++++++++++++
datafusion-cli/src/lib.rs | 1 +
datafusion-cli/src/main.rs | 6 ++
datafusion/core/src/execution/context.rs | 17 +++-
6 files changed, 192 insertions(+), 1 deletion(-)
diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock
index f9d3437f3..e4bc1b20d 100644
--- a/datafusion-cli/Cargo.lock
+++ b/datafusion-cli/Cargo.lock
@@ -701,12 +701,14 @@ name = "datafusion-cli"
version = "16.0.0"
dependencies = [
"arrow",
+ "async-trait",
"clap",
"datafusion",
"dirs",
"env_logger",
"mimalloc",
"object_store",
+ "parking_lot",
"rustyline",
"tokio",
"url",
diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml
index d34e4b1d6..7ad2041e6 100644
--- a/datafusion-cli/Cargo.toml
+++ b/datafusion-cli/Cargo.toml
@@ -30,12 +30,14 @@ readme = "README.md"
[dependencies]
arrow = "29.0.0"
+async-trait = "0.1.41"
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = { path = "../datafusion/core", version = "16.0.0" }
dirs = "4.0.0"
env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
object_store = { version = "0.5.0", features = ["aws", "gcp"] }
+parking_lot = { version = "0.12" }
rustyline = "10.0"
tokio = { version = "1.23", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }
url = "2.2"
diff --git a/datafusion-cli/src/catalog.rs b/datafusion-cli/src/catalog.rs
new file mode 100644
index 000000000..dbd6751a4
--- /dev/null
+++ b/datafusion-cli/src/catalog.rs
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use datafusion::catalog::catalog::{CatalogList, CatalogProvider};
+use datafusion::catalog::schema::SchemaProvider;
+use datafusion::datasource::listing::{
+ ListingTable, ListingTableConfig, ListingTableUrl,
+};
+use datafusion::datasource::TableProvider;
+use datafusion::error::Result;
+use datafusion::execution::context::SessionState;
+use parking_lot::RwLock;
+use std::any::Any;
+use std::sync::{Arc, Weak};
+
+/// Wraps another catalog, automatically creating table providers
+/// for local files if needed
+pub struct DynamicFileCatalog {
+ inner: Arc<dyn CatalogList>,
+ state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileCatalog {
+ pub fn new(inner: Arc<dyn CatalogList>, state: Weak<RwLock<SessionState>>) -> Self {
+ Self { inner, state }
+ }
+}
+
+impl CatalogList for DynamicFileCatalog {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn register_catalog(
+ &self,
+ name: String,
+ catalog: Arc<dyn CatalogProvider>,
+ ) -> Option<Arc<dyn CatalogProvider>> {
+ self.inner.register_catalog(name, catalog)
+ }
+
+ fn catalog_names(&self) -> Vec<String> {
+ self.inner.catalog_names()
+ }
+
+ fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
+ let state = self.state.clone();
+ self.inner
+ .catalog(name)
+ .map(|catalog| Arc::new(DynamicFileCatalogProvider::new(catalog, state)) as _)
+ }
+}
+
+/// Wraps another catalog provider
+struct DynamicFileCatalogProvider {
+ inner: Arc<dyn CatalogProvider>,
+ state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileCatalogProvider {
+ pub fn new(
+ inner: Arc<dyn CatalogProvider>,
+ state: Weak<RwLock<SessionState>>,
+ ) -> Self {
+ Self { inner, state }
+ }
+}
+
+impl CatalogProvider for DynamicFileCatalogProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema_names(&self) -> Vec<String> {
+ self.inner.schema_names()
+ }
+
+ fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
+ let state = self.state.clone();
+ self.inner
+ .schema(name)
+ .map(|schema| Arc::new(DynamicFileSchemaProvider::new(schema, state)) as _)
+ }
+
+ fn register_schema(
+ &self,
+ name: &str,
+ schema: Arc<dyn SchemaProvider>,
+ ) -> Result<Option<Arc<dyn SchemaProvider>>> {
+ self.inner.register_schema(name, schema)
+ }
+}
+
+/// Wraps another schema provider
+struct DynamicFileSchemaProvider {
+ inner: Arc<dyn SchemaProvider>,
+ state: Weak<RwLock<SessionState>>,
+}
+
+impl DynamicFileSchemaProvider {
+ pub fn new(
+ inner: Arc<dyn SchemaProvider>,
+ state: Weak<RwLock<SessionState>>,
+ ) -> Self {
+ Self { inner, state }
+ }
+}
+
+#[async_trait]
+impl SchemaProvider for DynamicFileSchemaProvider {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn table_names(&self) -> Vec<String> {
+ self.inner.table_names()
+ }
+
+ fn register_table(
+ &self,
+ name: String,
+ table: Arc<dyn TableProvider>,
+ ) -> Result<Option<Arc<dyn TableProvider>>> {
+ self.inner.register_table(name, table)
+ }
+
+ async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
+ let inner_table = self.inner.table(name).await;
+ if inner_table.is_some() {
+ return inner_table;
+ }
+
+ // if the inner schema provider didn't have a table by
+ // that name, try to treat it as a listing table
+ let state = self.state.upgrade()?.read().clone();
+ let config = ListingTableConfig::new(ListingTableUrl::parse(name).ok()?)
+ .infer(&state)
+ .await
+ .ok()?;
+ Some(Arc::new(ListingTable::try_new(config).ok()?))
+ }
+
+ fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
+ self.inner.deregister_table(name)
+ }
+
+ fn table_exist(&self, name: &str) -> bool {
+ self.inner.table_exist(name)
+ }
+}
diff --git a/datafusion-cli/src/lib.rs b/datafusion-cli/src/lib.rs
index 44d8f0610..7eb3cb51c 100644
--- a/datafusion-cli/src/lib.rs
+++ b/datafusion-cli/src/lib.rs
@@ -18,6 +18,7 @@
#![doc = include_str!("../README.md")]
pub const DATAFUSION_CLI_VERSION: &str = env!("CARGO_PKG_VERSION");
+pub mod catalog;
pub mod command;
pub mod exec;
pub mod functions;
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index b9ada1a1f..fa4adce14 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -21,6 +21,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::prelude::SessionContext;
+use datafusion_cli::catalog::DynamicFileCatalog;
use datafusion_cli::object_storage::DatafusionCliObjectStoreProvider;
use datafusion_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
@@ -106,6 +107,11 @@ pub async fn main() -> Result<()> {
let mut ctx =
SessionContext::with_config_rt(session_config.clone(), Arc::new(runtime_env));
ctx.refresh_catalogs().await?;
+ // install dynamic catalog provider that knows how to open files
+ ctx.register_catalog_list(Arc::new(DynamicFileCatalog::new(
+ ctx.state().catalog_list(),
+ ctx.state_weak_ref(),
+ )));
let mut print_options = PrintOptions {
format: args.format,
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index ba8accdce..9d652f811 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -30,7 +30,6 @@ use crate::{
pub use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::var_provider::is_system_variables;
use parking_lot::RwLock;
-use std::ops::ControlFlow;
use std::sync::Arc;
use std::{
any::{Any, TypeId},
@@ -41,6 +40,7 @@ use std::{
collections::{HashMap, HashSet},
fmt::Debug,
};
+use std::{ops::ControlFlow, sync::Weak};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::record_batch::RecordBatch;
@@ -1009,6 +1009,16 @@ impl SessionContext {
state.execution_props.start_execution();
state
}
+
+ /// Get weak reference to [`SessionState`]
+ pub fn state_weak_ref(&self) -> Weak<RwLock<SessionState>> {
+ Arc::downgrade(&self.state)
+ }
+
+ /// Register [`CatalogList`] in [`SessionState`]
+ pub fn register_catalog_list(&mut self, catalog_list: Arc<dyn CatalogList>) {
+ self.state.write().catalog_list = catalog_list;
+ }
}
impl FunctionRegistry for SessionContext {
@@ -1788,6 +1798,11 @@ impl SessionState {
pub fn task_ctx(&self) -> Arc<TaskContext> {
Arc::new(TaskContext::from(self))
}
+
+ /// Return catalog list
+ pub fn catalog_list(&self) -> Arc<dyn CatalogList> {
+ self.catalog_list.clone()
+ }
}
struct SessionContextProvider<'a> {