You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/11/17 15:17:05 UTC

[arrow-datafusion] branch master updated: refactor how we create listing tables (#4227)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new e18f7bac2 refactor how we create listing tables (#4227)
e18f7bac2 is described below

commit e18f7bac209987dcc95c1bfa01de9c13ef75efe4
Author: Tim Van Wassenhove <ti...@timvw.be>
AuthorDate: Thu Nov 17 16:16:59 2022 +0100

    refactor how we create listing tables (#4227)
    
    * refactor how we create listing tables
    
    * run linting
    
    * attempt to register default table factories
    
    * pass csv as catalog type
    
    * enure that listingschemaprovider can keep working for csv with header
    
    * no need to registery default table factories
    
    * remove unused imports
    
    * use builder for listingoptions
    
    * remove comment
    
    * fix roundtrip test
    
    * Update datafusion/core/src/datasource/listing_table_factory.rs
    
    Co-authored-by: Andy Grove <an...@gmail.com>
    
    * register a listingtablefactory for NDJSON
    
    * always verify that a factory is available
    
    * allow NDJSON as an alias for JSON
    
    Co-authored-by: Andy Grove <an...@gmail.com>
---
 datafusion-cli/src/main.rs                         |  26 +---
 datafusion/core/src/catalog/listing_schema.rs      |  12 +-
 .../core/src/datasource/file_format/file_type.rs   |   2 +-
 .../core/src/datasource/listing_table_factory.rs   |  69 ++++++++--
 datafusion/core/src/execution/context.rs           | 145 +++++++--------------
 datafusion/core/src/execution/runtime_env.rs       |  13 +-
 datafusion/core/tests/sql/create_drop.rs           |  16 +--
 datafusion/proto/src/lib.rs                        |   2 +-
 datafusion/proto/src/logical_plan.rs               |  18 +--
 9 files changed, 141 insertions(+), 162 deletions(-)

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index f5c2efb22..a6a3daa2e 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
@@ -16,9 +16,6 @@
 // under the License.
 
 use clap::Parser;
-use datafusion::datasource::datasource::TableProviderFactory;
-use datafusion::datasource::file_format::file_type::FileType;
-use datafusion::datasource::listing_table_factory::ListingTableFactory;
 use datafusion::datasource::object_store::ObjectStoreRegistry;
 use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::context::SessionConfig;
@@ -29,7 +26,6 @@ use datafusion_cli::{
     exec, print_format::PrintFormat, print_options::PrintOptions, DATAFUSION_CLI_VERSION,
 };
 use mimalloc::MiMalloc;
-use std::collections::HashMap;
 use std::env;
 use std::path::Path;
 use std::sync::Arc;
@@ -147,31 +143,11 @@ pub async fn main() -> Result<()> {
 }
 
 fn create_runtime_env() -> Result<RuntimeEnv> {
-    let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
-        HashMap::new();
-    table_factories.insert(
-        "csv".to_string(),
-        Arc::new(ListingTableFactory::new(FileType::CSV)),
-    );
-    table_factories.insert(
-        "parquet".to_string(),
-        Arc::new(ListingTableFactory::new(FileType::PARQUET)),
-    );
-    table_factories.insert(
-        "avro".to_string(),
-        Arc::new(ListingTableFactory::new(FileType::AVRO)),
-    );
-    table_factories.insert(
-        "json".to_string(),
-        Arc::new(ListingTableFactory::new(FileType::JSON)),
-    );
-
     let object_store_provider = DatafusionCliObjectStoreProvider {};
     let object_store_registry =
         ObjectStoreRegistry::new_with_provider(Some(Arc::new(object_store_provider)));
     let rn_config = RuntimeConfig::new()
-        .with_object_store_registry(Arc::new(object_store_registry))
-        .with_table_factories(table_factories);
+        .with_object_store_registry(Arc::new(object_store_registry));
     RuntimeEnv::new(rn_config)
 }
 
diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index bac2724a0..6e8dcd5b2 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -49,6 +49,8 @@ pub struct ListingSchemaProvider {
     factory: Arc<dyn TableProviderFactory>,
     store: Arc<dyn ObjectStore>,
     tables: Arc<Mutex<HashMap<String, Arc<dyn TableProvider>>>>,
+    format: String,
+    has_header: bool,
 }
 
 impl ListingSchemaProvider {
@@ -59,11 +61,15 @@ impl ListingSchemaProvider {
     /// `path`: The root path that contains subfolders which represent tables
     /// `factory`: The `TableProviderFactory` to use to instantiate tables for each subfolder
     /// `store`: The `ObjectStore` containing the table data
+    /// `format`: The `FileFormat` of the tables
+    /// `has_header`: Indicates whether the created external table has the has_header flag enabled
     pub fn new(
         authority: String,
         path: object_store::path::Path,
         factory: Arc<dyn TableProviderFactory>,
         store: Arc<dyn ObjectStore>,
+        format: String,
+        has_header: bool,
     ) -> Self {
         Self {
             authority,
@@ -71,6 +77,8 @@ impl ListingSchemaProvider {
             factory,
             store,
             tables: Arc::new(Mutex::new(HashMap::new())),
+            format,
+            has_header,
         }
     }
 
@@ -118,8 +126,8 @@ impl ListingSchemaProvider {
                             schema: Arc::new(DFSchema::empty()),
                             name: table_name.to_string(),
                             location: table_url,
-                            file_type: "".to_string(),
-                            has_header: false,
+                            file_type: self.format.clone(),
+                            has_header: self.has_header,
                             delimiter: ',',
                             table_partition_cols: vec![],
                             if_not_exists: false,
diff --git a/datafusion/core/src/datasource/file_format/file_type.rs b/datafusion/core/src/datasource/file_format/file_type.rs
index f08a21ca1..78f6dbc9a 100644
--- a/datafusion/core/src/datasource/file_format/file_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_type.rs
@@ -158,7 +158,7 @@ impl FromStr for FileType {
             "AVRO" => Ok(FileType::AVRO),
             "PARQUET" => Ok(FileType::PARQUET),
             "CSV" => Ok(FileType::CSV),
-            "JSON" => Ok(FileType::JSON),
+            "JSON" | "NDJSON" => Ok(FileType::JSON),
             _ => Err(DataFusionError::NotImplemented(format!(
                 "Unknown FileType: {}",
                 s
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 7bd798f8e..3662ab361 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -20,7 +20,7 @@
 use crate::datasource::datasource::TableProviderFactory;
 use crate::datasource::file_format::avro::AvroFormat;
 use crate::datasource::file_format::csv::CsvFormat;
-use crate::datasource::file_format::file_type::{FileType, GetExt};
+use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::datasource::file_format::json::JsonFormat;
 use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::file_format::FileFormat;
@@ -30,18 +30,24 @@ use crate::datasource::listing::{
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
 use async_trait::async_trait;
+use datafusion_common::DataFusionError;
 use datafusion_expr::CreateExternalTable;
+use std::str::FromStr;
 use std::sync::Arc;
 
 /// A `TableProviderFactory` capable of creating new `ListingTable`s
-pub struct ListingTableFactory {
-    file_type: FileType,
-}
+pub struct ListingTableFactory {}
 
 impl ListingTableFactory {
     /// Creates a new `ListingTableFactory`
-    pub fn new(file_type: FileType) -> Self {
-        Self { file_type }
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for ListingTableFactory {
+    fn default() -> Self {
+        Self::new()
     }
 }
 
@@ -52,24 +58,59 @@ impl TableProviderFactory for ListingTableFactory {
         state: &SessionState,
         cmd: &CreateExternalTable,
     ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
-        let file_extension = self.file_type.get_ext();
+        let file_compression_type = FileCompressionType::from_str(
+            cmd.file_compression_type.as_str(),
+        )
+        .map_err(|_| {
+            DataFusionError::Execution(format!(
+                "Unknown FileCompressionType {}",
+                cmd.file_compression_type.as_str()
+            ))
+        })?;
+        let file_type = FileType::from_str(cmd.file_type.as_str()).map_err(|_| {
+            DataFusionError::Execution(format!("Unknown FileType {}", cmd.file_type))
+        })?;
 
-        let file_format: Arc<dyn FileFormat> = match self.file_type {
-            FileType::CSV => Arc::new(CsvFormat::default()),
+        let file_extension =
+            file_type.get_ext_with_compression(file_compression_type.to_owned())?;
+
+        let file_format: Arc<dyn FileFormat> = match file_type {
+            FileType::CSV => Arc::new(
+                CsvFormat::default()
+                    .with_has_header(cmd.has_header)
+                    .with_delimiter(cmd.delimiter as u8)
+                    .with_file_compression_type(file_compression_type),
+            ),
             FileType::PARQUET => Arc::new(ParquetFormat::default()),
             FileType::AVRO => Arc::new(AvroFormat::default()),
-            FileType::JSON => Arc::new(JsonFormat::default()),
+            FileType::JSON => Arc::new(
+                JsonFormat::default().with_file_compression_type(file_compression_type),
+            ),
+        };
+
+        let provided_schema = if cmd.schema.fields().is_empty() {
+            None
+        } else {
+            Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
         };
 
-        let options =
-            ListingOptions::new(file_format).with_file_extension(file_extension);
+        let options = ListingOptions::new(file_format)
+            .with_collect_stat(state.config.collect_statistics)
+            .with_file_extension(file_extension)
+            .with_target_partitions(state.config.target_partitions)
+            .with_table_partition_cols(cmd.table_partition_cols.clone())
+            .with_file_sort_order(None);
 
         let table_path = ListingTableUrl::parse(&cmd.location)?;
-        let resolved_schema = options.infer_schema(state, &table_path).await?;
+        let resolved_schema = match provided_schema {
+            None => options.infer_schema(state, &table_path).await?,
+            Some(s) => s,
+        };
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
             .with_schema(resolved_schema);
-        let table = ListingTable::try_new(config)?;
+        let table =
+            ListingTable::try_new(config)?.with_definition(cmd.definition.clone());
         Ok(Arc::new(table))
     }
 }
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 216e49f4a..b793c9c68 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -22,13 +22,7 @@ use crate::{
         information_schema::CatalogWithInformationSchema,
     },
     datasource::listing::{ListingOptions, ListingTable},
-    datasource::{
-        file_format::{
-            avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
-            FileFormat,
-        },
-        MemTable, ViewTable,
-    },
+    datasource::{MemTable, ViewTable},
     logical_expr::{PlanType, ToStringifiedPlan},
     optimizer::optimizer::Optimizer,
     physical_optimizer::{
@@ -79,7 +73,6 @@ use crate::config::{
     ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
     OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_MAX_PASSES, OPT_OPTIMIZER_SKIP_FAILED_RULES,
 };
-use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
 use crate::execution::{runtime_env::RuntimeEnv, FunctionRegistry};
 use crate::physical_optimizer::enforcement::BasicEnforcement;
 use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
@@ -252,12 +245,9 @@ impl SessionContext {
     pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
         let plan = self.create_logical_plan(sql)?;
         match plan {
-            LogicalPlan::CreateExternalTable(cmd) => match cmd.file_type.as_str() {
-                "PARQUET" | "CSV" | "JSON" | "AVRO" => {
-                    self.create_listing_table(&cmd).await
-                }
-                _ => self.create_custom_table(&cmd).await,
-            },
+            LogicalPlan::CreateExternalTable(cmd) => {
+                self.create_external_table(&cmd).await
+            }
 
             LogicalPlan::CreateMemoryTable(CreateMemoryTable {
                 name,
@@ -490,88 +480,18 @@ impl SessionContext {
         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
     }
 
-    async fn create_custom_table(
-        &self,
-        cmd: &CreateExternalTable,
-    ) -> Result<Arc<DataFrame>> {
-        let state = self.state.read().clone();
-        let file_type = cmd.file_type.to_lowercase();
-        let factory = &state
-            .runtime_env
-            .table_factories
-            .get(file_type.as_str())
-            .ok_or_else(|| {
-                DataFusionError::Execution(format!(
-                    "Unable to find factory for {}",
-                    cmd.file_type
-                ))
-            })?;
-        let table = (*factory).create(&state, cmd).await?;
-        self.register_table(cmd.name.as_str(), table)?;
-        let plan = LogicalPlanBuilder::empty(false).build()?;
-        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
-    }
-
-    async fn create_listing_table(
+    async fn create_external_table(
         &self,
         cmd: &CreateExternalTable,
     ) -> Result<Arc<DataFrame>> {
-        let file_compression_type =
-            match FileCompressionType::from_str(cmd.file_compression_type.as_str()) {
-                Ok(t) => t,
-                Err(_) => Err(DataFusionError::Execution(
-                    "Only known FileCompressionTypes can be ListingTables!".to_string(),
-                ))?,
-            };
-
-        let file_type = match FileType::from_str(cmd.file_type.as_str()) {
-            Ok(t) => t,
-            Err(_) => Err(DataFusionError::Execution(
-                "Only known FileTypes can be ListingTables!".to_string(),
-            ))?,
-        };
-
-        let file_extension =
-            file_type.get_ext_with_compression(file_compression_type.to_owned())?;
-
-        let file_format: Arc<dyn FileFormat> = match file_type {
-            FileType::CSV => Arc::new(
-                CsvFormat::default()
-                    .with_has_header(cmd.has_header)
-                    .with_delimiter(cmd.delimiter as u8)
-                    .with_file_compression_type(file_compression_type),
-            ),
-            FileType::PARQUET => Arc::new(ParquetFormat::default()),
-            FileType::AVRO => Arc::new(AvroFormat::default()),
-            FileType::JSON => Arc::new(
-                JsonFormat::default().with_file_compression_type(file_compression_type),
-            ),
-        };
+        let table_provider: Arc<dyn TableProvider> =
+            self.create_custom_table(cmd).await?;
 
         let table = self.table(cmd.name.as_str());
         match (cmd.if_not_exists, table) {
             (true, Ok(_)) => self.return_empty_dataframe(),
             (_, Err(_)) => {
-                // TODO make schema in CreateExternalTable optional instead of empty
-                let provided_schema = if cmd.schema.fields().is_empty() {
-                    None
-                } else {
-                    Some(Arc::new(cmd.schema.as_ref().to_owned().into()))
-                };
-                let options = ListingOptions::new(file_format)
-                    .with_collect_stat(self.copied_config().collect_statistics)
-                    .with_file_extension(file_extension)
-                    .with_target_partitions(self.copied_config().target_partitions)
-                    .with_table_partition_cols(cmd.table_partition_cols.clone());
-
-                self.register_listing_table(
-                    cmd.name.as_str(),
-                    cmd.location.clone(),
-                    options,
-                    provided_schema,
-                    cmd.definition.clone(),
-                )
-                .await?;
+                self.register_table(cmd.name.as_str(), table_provider)?;
                 self.return_empty_dataframe()
             }
             (false, Ok(_)) => Err(DataFusionError::Execution(format!(
@@ -581,6 +501,26 @@ impl SessionContext {
         }
     }
 
+    async fn create_custom_table(
+        &self,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let state = self.state.read().clone();
+        let file_type = cmd.file_type.to_uppercase();
+        let factory = &state
+            .runtime_env
+            .table_factories
+            .get(file_type.as_str())
+            .ok_or_else(|| {
+                DataFusionError::Execution(format!(
+                    "Unable to find factory for {}",
+                    cmd.file_type
+                ))
+            })?;
+        let table = (*factory).create(&state, cmd).await?;
+        Ok(table)
+    }
+
     fn find_and_deregister<'a>(
         &self,
         table_ref: impl Into<TableReference<'a>>,
@@ -1625,6 +1565,15 @@ impl SessionState {
         }
         let url = url.to_string();
         let format = format.to_string();
+
+        let has_header = config
+            .config_options
+            .read()
+            .get("datafusion.catalog.has_header");
+        let has_header: bool = has_header
+            .map(|x| FromStr::from_str(&x.to_string()).unwrap_or_default())
+            .unwrap_or_default();
+
         let url = Url::parse(url.as_str()).expect("Invalid default catalog location!");
         let authority = match url.host_str() {
             Some(host) => format!("{}://{}", url.scheme(), host),
@@ -1642,7 +1591,14 @@ impl SessionState {
             Some(factory) => factory,
             _ => return,
         };
-        let schema = ListingSchemaProvider::new(authority, path, factory.clone(), store);
+        let schema = ListingSchemaProvider::new(
+            authority,
+            path,
+            factory.clone(),
+            store,
+            format,
+            has_header,
+        );
         let _ = default_catalog
             .register_schema("default", Arc::new(schema))
             .expect("Failed to register default schema");
@@ -2033,8 +1989,6 @@ impl FunctionRegistry for TaskContext {
 mod tests {
     use super::*;
     use crate::assert_batches_eq;
-    use crate::datasource::datasource::TableProviderFactory;
-    use crate::datasource::listing_table_factory::ListingTableFactory;
     use crate::execution::context::QueryPlanner;
     use crate::execution::runtime_env::RuntimeConfig;
     use crate::physical_plan::expressions::AvgAccumulator;
@@ -2295,15 +2249,12 @@ mod tests {
         let path = path.join("tests/tpch-csv");
         let url = format!("file://{}", path.display());
 
-        let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
-            HashMap::new();
-        let factory = Arc::new(ListingTableFactory::new(FileType::CSV));
-        table_factories.insert("test".to_string(), factory);
-        let rt_cfg = RuntimeConfig::new().with_table_factories(table_factories);
+        let rt_cfg = RuntimeConfig::new();
         let runtime = Arc::new(RuntimeEnv::new(rt_cfg).unwrap());
         let cfg = SessionConfig::new()
             .set_str("datafusion.catalog.location", url.as_str())
-            .set_str("datafusion.catalog.type", "test");
+            .set_str("datafusion.catalog.type", "CSV")
+            .set_str("datafusion.catalog.has_header", "true");
         let session_state = SessionState::with_config_rt(cfg, runtime);
         let ctx = SessionContext::with_state(session_state);
         ctx.refresh_catalogs().await?;
diff --git a/datafusion/core/src/execution/runtime_env.rs b/datafusion/core/src/execution/runtime_env.rs
index 7c3e9b4e6..64da4a103 100644
--- a/datafusion/core/src/execution/runtime_env.rs
+++ b/datafusion/core/src/execution/runtime_env.rs
@@ -28,6 +28,7 @@ use crate::{
 use std::collections::HashMap;
 
 use crate::datasource::datasource::TableProviderFactory;
+use crate::datasource::listing_table_factory::ListingTableFactory;
 use crate::datasource::object_store::ObjectStoreRegistry;
 use datafusion_common::DataFusionError;
 use object_store::ObjectStore;
@@ -152,7 +153,17 @@ pub struct RuntimeConfig {
 impl RuntimeConfig {
     /// New with default values
     pub fn new() -> Self {
-        Default::default()
+        let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
+            HashMap::new();
+        table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
+        Self {
+            table_factories,
+            ..Default::default()
+        }
     }
 
     /// Customize disk manager
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 143c53f18..452f41b7d 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -15,10 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::collections::HashMap;
 use std::io::Write;
 
-use datafusion::datasource::datasource::TableProviderFactory;
 use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
 use datafusion::test_util::TestTableFactory;
 use tempfile::TempDir;
@@ -433,10 +431,9 @@ async fn create_pipe_delimited_csv_table() -> Result<()> {
 
 #[tokio::test]
 async fn create_custom_table() -> Result<()> {
-    let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
-        HashMap::new();
-    table_factories.insert("deltatable".to_string(), Arc::new(TestTableFactory {}));
-    let cfg = RuntimeConfig::new().with_table_factories(table_factories);
+    let mut cfg = RuntimeConfig::new();
+    cfg.table_factories
+        .insert("DELTATABLE".to_string(), Arc::new(TestTableFactory {}));
     let env = RuntimeEnv::new(cfg).unwrap();
     let ses = SessionConfig::new();
     let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
@@ -454,10 +451,9 @@ async fn create_custom_table() -> Result<()> {
 
 #[tokio::test]
 async fn create_external_table_with_ddl() -> Result<()> {
-    let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
-        HashMap::new();
-    table_factories.insert("mocktable".to_string(), Arc::new(TestTableFactory {}));
-    let cfg = RuntimeConfig::new().with_table_factories(table_factories);
+    let mut cfg = RuntimeConfig::new();
+    cfg.table_factories
+        .insert("MOCKTABLE".to_string(), Arc::new(TestTableFactory {}));
     let env = RuntimeEnv::new(cfg).unwrap();
     let ses = SessionConfig::new();
     let ctx = SessionContext::with_config_rt(ses, Arc::new(env));
diff --git a/datafusion/proto/src/lib.rs b/datafusion/proto/src/lib.rs
index 99a2365b4..2166b1d8b 100644
--- a/datafusion/proto/src/lib.rs
+++ b/datafusion/proto/src/lib.rs
@@ -208,7 +208,7 @@ mod roundtrip_tests {
     async fn roundtrip_custom_tables() -> Result<(), DataFusionError> {
         let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
             HashMap::new();
-        table_factories.insert("testtable".to_string(), Arc::new(TestTableFactory {}));
+        table_factories.insert("TESTTABLE".to_string(), Arc::new(TestTableFactory {}));
         let cfg = RuntimeConfig::new().with_table_factories(table_factories);
         let env = RuntimeEnv::new(cfg).unwrap();
         let ses = SessionConfig::new();
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 9ab754c92..499e92911 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -570,17 +570,13 @@ impl AsLogicalPlan for LogicalPlanNode {
                     None
                 };
 
-                match create_extern_table.file_type.as_str() {
-                    "CSV" | "JSON" | "PARQUET" | "AVRO" => {}
-                    it => {
-                        let env = &ctx.state.as_ref().read().runtime_env;
-                        if !env.table_factories.contains_key(it) {
-                            Err(DataFusionError::Internal(format!(
-                                "No TableProvider for file type: {}",
-                                it
-                            )))?
-                        }
-                    }
+                let file_type = create_extern_table.file_type.as_str();
+                let env = &ctx.state.as_ref().read().runtime_env;
+                if !env.table_factories.contains_key(file_type) {
+                    Err(DataFusionError::Internal(format!(
+                        "No TableProvider for file type: {}",
+                        file_type
+                    )))?
                 }
 
                 Ok(LogicalPlan::CreateExternalTable(CreateExternalTable {