You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/17 15:17:05 UTC
[arrow-datafusion] branch master updated: refactor how we create listing tables (#4227)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new e18f7bac2 refactor how we create listing tables (#4227)
e18f7bac2 is described below
commit e18f7bac209987dcc95c1bfa01de9c13ef75efe4
Author: Tim Van Wassenhove <ti...@timvw.be>
AuthorDate: Thu Nov 17 16:16:59 2022 +0100
refactor how we create listing tables (#4227)
* refactor how we create listing tables
* run linting
* attempt to register default table factories
* pass csv as catalog type
* enure that listingschemaprovider can keep working for csv with header
* no need to registery default table factories
* remove unused imports
* use builder for listingoptions
* remove comment
* fix roundtrip test
* Update datafusion/core/src/datasource/listing_table_factory.rs
Co-authored-by: Andy Grove <an...@gmail.com>
* register a listingtablefactory for NDJSON
* always verify that a factory is available
* allow NDJSON as an alias for JSON
Co-authored-by: Andy Grove <an...@gmail.com>
---
datafusion-cli/src/main.rs | 26 +---
datafusion/core/src/catalog/listing_schema.rs | 12 +-
.../core/src/datasource/file_format/file_type.rs | 2 +-
.../core/src/datasource/listing_table_factory.rs | 69 ++++++++--
datafusion/core/src/execution/context.rs | 145 +++++++--------------
datafusion/core/src/execution/runtime_env.rs | 13 +-
datafusion/core/tests/sql/create_drop.rs | 16 +--
datafusion/proto/src/lib.rs | 2 +-
datafusion/proto/src/logical_plan.rs | 18 +--
9 files changed, 141 insertions(+), 162 deletions(-)
diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index f5c2efb22..a6a3daa2e 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,9 +16,6 @@
// under the License.
use clap::Parser;
-use datafusion::datasource::datasource::TableProviderFactory;
-use datafusion::datasource::file_format::file_type::FileType;
-use datafusion::datasource::listing_table_factory::ListingTableFactory;
use datafusion::datasource::object_store::ObjectStoreRegistry;
use datafusion::error::{DataFusionError, Result};
use datafusion::execution::context::SessionConfig;
@@ -29,7 +26,6 @@ use datafusion_cli::{
exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
};
use mimalloc::MiMalloc;
-use std::collections::HashMap;
use std::env;
use std::path::Path;
use std::sync::Arc;
@@ -147,31 +143,11 @@ pub async fn main() -> Result<()> {
}
fn create_runtime_env() -> Result<RuntimeEnv> {
- let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
- HashMap::new();
- table_factories.insert(
- "csv".to_string(),
- Arc::new(ListingTableFactory::new(FileType::CSV)),
- );
- table_factories.insert(
- "parquet".to_string(),
- Arc::new(ListingTableFactory::new(FileType::PARQUET)),
- );
- table_factories.insert(
- "avro".to_string(),
- Arc::new(ListingTableFactory::new(FileType::AVRO)),
- );
- table_factories.insert(
- "json".to_string(),
- Arc::new(ListingTableFactory::new(FileType::JSON)),
- );
-
let object_store_provider = DatafusionCliObjectStoreProvider {};
let object_store_registry =
ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
let rn_config = RuntimeConfig::new()
- .with_object_store_registry(Arc::new(object_store_registry))
- .with_table_factories(table_factories);
+ .with_object_store_registry(Arc::new(object_store_registry));
RuntimeEnv::new(rn_config)
}
diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index bac2724a0..6e8dcd5b2 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -49,6 +49,8 @@ pub struct ListingSchemaProvider {
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
+ format: String,
+ has_header: bool,
}
impl ListingSchemaProvider {
@@ -59,11 +61,15 @@ impl ListingSchemaProvider {
/// `path`: The root path that contains subfolders which represent tables
/// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
/// `store`: The `ObjectStore` containing the table data
+ /// `format`: The `FileFormat` of the tables
+ /// `has_header`: Indicates whether the created external table has the has_header flag enabled
pub fn new(
authority: String,
path: object_store::path::Path,
factory: Arc<dyn TableProviderFactory>,
store: Arc<dyn ObjectStore>,
+ format: String,
+ has_header: bool,
) -> Self {
Self {
authority,
@@ -71,6 +77,8 @@ impl ListingSchemaProvider {
factory,
store,
tables: Arc::new(Mutex::new(HashMap::new())),
+ format,
+ has_header,
}
}
@@ -118,8 +126,8 @@ impl ListingSchemaProvider {
schema: Arc::new(DFSchema::empty()),
name: table_name.to_string(),
location: table_url,
- file_type: "".to_string(),
- has_header: false,
+ file_type: self.format.clone(),
+ has_header: self.has_header,
delimiter: ',',
table_partition_cols: vec![],
if_not_exists: false,
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index f08a21ca1..78f6dbc9a 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -158,7 +158,7 @@ impl FromStr for FileType {
"AVRO" => Ok(FileType::AVRO),
"PARQUET" => Ok(FileType::PARQUET),
"CSV" => Ok(FileType::CSV),
- "JSON" => Ok(FileType::JSON),
+ "JSON" | "NDJSON" => Ok(FileType::JSON),
_ => Err(DataFusionError::NotImplemented(format!(
"Unknown FileType: {}",
s
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 7bd798f8e..3662ab361 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -20,7 +20,7 @@
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::file_format::avro::AvroFormat;
use crate::datasource::file_format::csv::CsvFormat;
-use crate::datasource::file_format::file_type::{FileType, GetExt};
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::file_format::json::JsonFormat;
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::file_format::FileFormat;
@@ -30,18 +30,24 @@ use crate::datasource::listing::{
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use async_trait::async_trait;
+use datafusion_common::DataFusionError;
use datafusion_expr::CreateExternalTable;
+use std::str::FromStr;
use std::sync::Arc;
/// A `TableProviderFactory` capable of creating new `ListingTable`s
-pub struct ListingTableFactory {
- file_type: FileType,
-}
+pub struct ListingTableFactory {}
impl ListingTableFactory {
/// Creates a new `ListingTableFactory`
- pub fn new(file_type: FileType) -> Self {
- Self { file_type }
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl Default for ListingTableFactory {
+ fn default() -> Self {
+ Self::new()
}
}
@@ -52,24 +58,59 @@ impl TableProviderFactory for ListingTableFactory {
state: &SessionState,
cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
- let file_extension = self.file_type.get_ext();
+ let file_compression_type = FileCompressionType::from_str(
+ cmd.file_compression_type.as_str(),
+ )
+ .map_err(|_| {
+ DataFusionError::Execution(format!(
+ "Unknown FileCompressionType {}",
+ cmd.file_compression_type.as_str()
+ ))
+ })?;
+ let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
+ DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
+ })?;
- let file_format: Arc<dyn FileFormat> = match self.file_type {
- FileType::CSV => Arc::new(CsvFormat::default()),
+ let file_extension =
+ file_type.get_ext_with_compression(file_compression_type.to_owned())?;
+
+ let file_format: Arc<dyn FileFormat> = match file_type {
+ FileType::CSV => Arc::new(
+ CsvFormat::default()
+ .with_has_header(cmd.has_header)
+ .with_delimiter(cmd.delimiter as u8)
+ .with_file_compression_type(file_compression_type),
+ ),
FileType::PARQUET => Arc::new(ParquetFormat::default()),
FileType::AVRO => Arc::new(AvroFormat::default()),
- FileType::JSON => Arc::new(JsonFormat::default()),
+ FileType::JSON => Arc::new(
+ JsonFormat::default().with_file_compression_type(file_compression_type),
+ ),
+ };
+
+ let provided_schema = if cmd.schema.fields().is_empty() {
+ None
+ } else {
+ Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
};
- let options =
- ListingOptions::new(file_format).with_file_extension(file_extension);
+ let options = ListingOptions::new(file_format)
+ .with_collect_stat(state.config.collect_statistics)
+ .with_file_extension(file_extension)
+ .with_target_partitions(state.config.target_partitions)
+ .with_table_partition_cols(cmd.table_partition_cols.clone())
+ .with_file_sort_order(None);
let table_path = ListingTableUrl::parse(&cmd.location)?;
- let resolved_schema = options.infer_schema(state, &table_path).await?;
+ let resolved_schema = match provided_schema {
+ None => options.infer_schema(state, &table_path).await?,
+ Some(s) => s,
+ };
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(resolved_schema);
- let table = ListingTable::try_new(config)?;
+ let table =
+ ListingTable::try_new(config)?.with_definition(cmd.definition.clone());
Ok(Arc::new(table))
}
}
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 216e49f4a..b793c9c68 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -22,13 +22,7 @@ use crate::{
information_schema::CatalogWithInformationSchema,
},
datasource::listing::{ListingOptions, ListingTable},
- datasource::{
- file_format::{
- avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
- FileFormat,
- },
- MemTable, ViewTable,
- },
+ datasource::{MemTable, ViewTable},
logical_expr::{PlanType, ToStringifiedPlan},
optimizer::optimizer::Optimizer,
physical_optimizer::{
@@ -79,7 +73,6 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
-use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
use crate::physical_optimizer::enforcement::BasicEnforcement;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
@@ -252,12 +245,9 @@ impl SessionContext {
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
let plan = self.create_logical_plan(sql)?;
match plan {
- LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
- "PARQUET" | "CSV" | "JSON" | "AVRO" => {
- self.create_listing_table(&cmd).await
- }
- _ => self.create_custom_table(&cmd).await,
- },
+ LogicalPlan::CreateExternalTable(cmd) => {
+ self.create_external_table(&cmd).await
+ }
LogicalPlan::CreateMemoryTable(CreateMemoryTable {
name,
@@ -490,88 +480,18 @@ impl SessionContext {
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
- async fn create_custom_table(
- &self,
- cmd: &CreateExternalTable,
- ) -> Result<Arc<DataFrame>> {
- let state = self.state.read().clone();
- let file_type = cmd.file_type.to_lowercase();
- let factory = &state
- .runtime_env
- .table_factories
- .get(file_type.as_str())
- .ok_or_else(|| {
- DataFusionError::Execution(format!(
- "Unable to find factory for {}",
- cmd.file_type
- ))
- })?;
- let table = (*factory).create(&state, cmd).await?;
- self.register_table(cmd.name.as_str(), table)?;
- let plan = LogicalPlanBuilder::empty(false).build()?;
- Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
- }
-
- async fn create_listing_table(
+ async fn create_external_table(
&self,
cmd: &CreateExternalTable,
) -> Result<Arc<DataFrame>> {
- let file_compression_type =
- match FileCompressionType::from_str(cmd.file_compression_type.as_str()) {
- Ok(t) => t,
- Err(_) => Err(DataFusionError::Execution(
- "Only known FileCompressionTypes can be ListingTables!".to_string(),
- ))?,
- };
-
- let file_type = match FileType::from_str(cmd.file_type.as_str()) {
- Ok(t) => t,
- Err(_) => Err(DataFusionError::Execution(
- "Only known FileTypes can be ListingTables!".to_string(),
- ))?,
- };
-
- let file_extension =
- file_type.get_ext_with_compression(file_compression_type.to_owned())?;
-
- let file_format: Arc<dyn FileFormat> = match file_type {
- FileType::CSV => Arc::new(
- CsvFormat::default()
- .with_has_header(cmd.has_header)
- .with_delimiter(cmd.delimiter as u8)
- .with_file_compression_type(file_compression_type),
- ),
- FileType::PARQUET => Arc::new(ParquetFormat::default()),
- FileType::AVRO => Arc::new(AvroFormat::default()),
- FileType::JSON => Arc::new(
- JsonFormat::default().with_file_compression_type(file_compression_type),
- ),
- };
+ let table_provider: Arc<dyn TableProvider> =
+ self.create_custom_table(cmd).await?;
let table = self.table(cmd.name.as_str());
match (cmd.if_not_exists, table) {
(true, Ok(_)) => self.return_empty_dataframe(),
(_, Err(_)) => {
- // TODO make schema in CreateExternalTable optional instead of empty
- let provided_schema = if cmd.schema.fields().is_empty() {
- None
- } else {
- Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
- };
- let options = ListingOptions::new(file_format)
- .with_collect_stat(self.copied_config().collect_statistics)
- .with_file_extension(file_extension)
- .with_target_partitions(self.copied_config().target_partitions)
- .with_table_partition_cols(cmd.table_partition_cols.clone());
-
- self.register_listing_table(
- cmd.name.as_str(),
- cmd.location.clone(),
- options,
- provided_schema,
- cmd.definition.clone(),
- )
- .await?;
+ self.register_table(cmd.name.as_str(), table_provider)?;
self.return_empty_dataframe()
}
(false, Ok(_)) => Err(DataFusionError::Execution(format!(
@@ -581,6 +501,26 @@ impl SessionContext {
}
}
+ async fn create_custom_table(
+ &self,
+ cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let state = self.state.read().clone();
+ let file_type = cmd.file_type.to_uppercase();
+ let factory = &state
+ .runtime_env
+ .table_factories
+ .get(file_type.as_str())
+ .ok_or_else(|| {
+ DataFusionError::Execution(format!(
+ "Unable to find factory for {}",
+ cmd.file_type
+ ))
+ })?;
+ let table = (*factory).create(&state, cmd).await?;
+ Ok(table)
+ }
+
fn find_and_deregister<'a>(
&self,
table_ref: impl Into<TableReference<'a>>,
@@ -1625,6 +1565,15 @@ impl SessionState {
}
let url = url.to_string();
let format = format.to_string();
+
+ let has_header = config
+ .config_options
+ .read()
+ .get("datafusion.catalog.has_header");
+ let has_header: bool = has_header
+ .map(|x| FromStr::from_str(&x.to_string()).unwrap_or_default())
+ .unwrap_or_default();
+
let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
let authority = match url.host_str() {
Some(host) => format!("{}://{}", url.scheme(), host),
@@ -1642,7 +1591,14 @@ impl SessionState {
Some(factory) => factory,
_ => return,
};
- let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
+ let schema = ListingSchemaProvider::new(
+ authority,
+ path,
+ factory.clone(),
+ store,
+ format,
+ has_header,
+ );
let _ = default_catalog
.register_schema("default", Arc::new(schema))
.expect("Failed to register default schema");
@@ -2033,8 +1989,6 @@ impl FunctionRegistry for TaskContext {
mod tests {
use super::*;
use crate::assert_batches_eq;
- use crate::datasource::datasource::TableProviderFactory;
- use crate::datasource::listing_table_factory::ListingTableFactory;
use crate::execution::context::QueryPlanner;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
@@ -2295,15 +2249,12 @@ mod tests {
let path = path.join("tests/tpch-csv");
let url = format!("file://{}", path.display());
- let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
- HashMap::new();
- let factory = Arc::new(ListingTableFactory::new(FileType::CSV));
- table_factories.insert("test".to_string(), factory);
- let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
+ let rt_cfg = RuntimeConfig::new();
let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
let cfg = SessionConfig::new()
.set_str("datafusion.catalog.location", url.as_str())
- .set_str("datafusion.catalog.type", "test");
+ .set_str("datafusion.catalog.type", "CSV")
+ .set_str("datafusion.catalog.has_header", "true");
let session_state = SessionState::with_config_rt(cfg, runtime);
let ctx = SessionContext::with_state(session_state);
ctx.refresh_catalogs().await?;
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 7c3e9b4e6..64da4a103 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -28,6 +28,7 @@ use crate::{
use std::collections::HashMap;
use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::listing_table_factory::ListingTableFactory;
use crate::datasource::object_store::ObjectStoreRegistry;
use datafusion_common::DataFusionError;
use object_store::ObjectStore;
@@ -152,7 +153,17 @@ pub struct RuntimeConfig {
impl RuntimeConfig {
/// New with default values
pub fn new() -> Self {
- Default::default()
+ let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
+ HashMap::new();
+ table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
+ Self {
+ table_factories,
+ ..Default::default()
+ }
}
/// Customize disk manager
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 143c53f18..452f41b7d 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-use std::collections::HashMap;
use std::io::Write;
-use datafusion::datasource::datasource::TableProviderFactory;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::test_util::TestTableFactory;
use tempfile::TempDir;
@@ -433,10 +431,9 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
#[tokio::test]
async fn create_custom_table() -> Result<()> {
- let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
- HashMap::new();
- table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
- let cfg = RuntimeConfig::new().with_table_factories(table_factories);
+ let mut cfg = RuntimeConfig::new();
+ cfg.table_factories
+ .insert("DELTATABLE".to_string(), Arc::new(TestTableFactory {}));
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
@@ -454,10 +451,9 @@ async fn create_custom_table() -> Result<()> {
#[tokio::test]
async fn create_external_table_with_ddl() -> Result<()> {
- let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
- HashMap::new();
- table_factories.insert("mocktable".to_string(), Arc::new(TestTableFactory {}));
- let cfg = RuntimeConfig::new().with_table_factories(table_factories);
+ let mut cfg = RuntimeConfig::new();
+ cfg.table_factories
+ .insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {}));
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 99a2365b4..2166b1d8b 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -208,7 +208,7 @@ mod roundtrip_tests {
async fn roundtrip_custom_tables() -> Result<(), DataFusionError> {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
- table_factories.insert("testtable".to_string(), Arc::new(TestTableFactory {}));
+ table_factories.insert("TESTTABLE".to_string(), Arc::new(TestTableFactory {}));
let cfg = RuntimeConfig::new().with_table_factories(table_factories);
let env = RuntimeEnv::new(cfg).unwrap();
let ses = SessionConfig::new();
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 9ab754c92..499e92911 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -570,17 +570,13 @@ impl AsLogicalPlan for LogicalPlanNode {
None
};
- match create_extern_table.file_type.as_str() {
- "CSV" | "JSON" | "PARQUET" | "AVRO" => {}
- it => {
- let env = &ctx.state.as_ref().read().runtime_env;
- if !env.table_factories.contains_key(it) {
- Err(DataFusionError::Internal(format!(
- "No TableProvider for file type: {}",
- it
- )))?
- }
- }
+ let file_type = create_extern_table.file_type.as_str();
+ let env = &ctx.state.as_ref().read().runtime_env;
+ if !env.table_factories.contains_key(file_type) {
+ Err(DataFusionError::Internal(format!(
+ "No TableProvider for file type: {}",
+ file_type
+ )))?
}
Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {