You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/08 21:42:29 UTC

[arrow-datafusion] branch master updated: extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 2a834d16a extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)
2a834d16a is described below

commit 2a834d16ab54a1f0ad1b4cf826a0d1421dab9ae3
Author: Tim Van Wassenhove <ti...@timvw.be>
AuthorDate: Tue Nov 8 22:42:23 2022 +0100

    extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)
---
 datafusion/core/src/catalog/listing_schema.rs      | 26 +++++--
 datafusion/core/src/datasource/datasource.rs       |  4 +-
 .../core/src/datasource/listing_table_factory.rs   |  5 +-
 datafusion/core/src/execution/context.rs           |  2 +-
 datafusion/core/src/test_util.rs                   |  8 +-
 datafusion/expr/src/logical_plan/plan.rs           |  4 +-
 datafusion/proto/proto/datafusion.proto            |  1 +
 datafusion/proto/src/generated/pbjson.rs           | 19 +++++
 datafusion/proto/src/generated/prost.rs            |  2 +
 datafusion/proto/src/logical_plan.rs               |  3 +
 datafusion/sql/src/parser.rs                       | 88 ++++++++++++++++++++++
 datafusion/sql/src/planner.rs                      |  2 +
 12 files changed, 148 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index 0a92590f6..bac2724a0 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -20,7 +20,8 @@ use crate::catalog::schema::SchemaProvider;
 use crate::datasource::datasource::TableProviderFactory;
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
-use datafusion_common::{context, DataFusionError};
+use datafusion_common::{DFSchema, DataFusionError};
+use datafusion_expr::CreateExternalTable;
 use futures::TryStreamExt;
 use itertools::Itertools;
 use object_store::ObjectStore;
@@ -108,13 +109,26 @@ impl ListingSchemaProvider {
             })?;
             if !self.table_exist(table_name) {
                 let table_url = format!("{}/{}", self.authority, table_path);
+
                 let provider = self
                     .factory
-                    .create(state, table_url.as_str())
-                    .await
-                    .map_err(|e| {
-                        context!(format!("Could not create table for {}", table_url), e)
-                    })?;
+                    .create(
+                        state,
+                        &CreateExternalTable {
+                            schema: Arc::new(DFSchema::empty()),
+                            name: table_name.to_string(),
+                            location: table_url,
+                            file_type: "".to_string(),
+                            has_header: false,
+                            delimiter: ',',
+                            table_partition_cols: vec![],
+                            if_not_exists: false,
+                            definition: None,
+                            file_compression_type: "".to_string(),
+                            options: Default::default(),
+                        },
+                    )
+                    .await?;
                 let _ = self.register_table(table_name.to_string(), provider.clone())?;
             }
         }
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 873b11bb2..5a2ba4c2e 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
 
 use async_trait::async_trait;
 use datafusion_common::Statistics;
-use datafusion_expr::LogicalPlan;
+use datafusion_expr::{CreateExternalTable, LogicalPlan};
 pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
 
 use crate::arrow::datatypes::SchemaRef;
@@ -95,6 +95,6 @@ pub trait TableProviderFactory: Sync + Send {
     async fn create(
         &self,
         ctx: &SessionState,
-        url: &str,
+        cmd: &CreateExternalTable,
     ) -> Result<Arc<dyn TableProvider>>;
 }
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 117ab8fb2..253172738 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -30,6 +30,7 @@ use crate::datasource::listing::{
 use crate::datasource::TableProvider;
 use crate::execution::context::SessionState;
 use async_trait::async_trait;
+use datafusion_expr::CreateExternalTable;
 use std::sync::Arc;
 
 /// A `TableProviderFactory` capable of creating new `ListingTable`s
@@ -49,7 +50,7 @@ impl TableProviderFactory for ListingTableFactory {
     async fn create(
         &self,
         state: &SessionState,
-        url: &str,
+        cmd: &CreateExternalTable,
     ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
         let file_extension = self.file_type.get_ext();
 
@@ -68,7 +69,7 @@ impl TableProviderFactory for ListingTableFactory {
             table_partition_cols: vec![],
         };
 
-        let table_path = ListingTableUrl::parse(url)?;
+        let table_path = ListingTableUrl::parse(&cmd.location)?;
         let resolved_schema = options.infer_schema(state, &table_path).await?;
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 553a208b8..8f5210998 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -506,7 +506,7 @@ impl SessionContext {
                     cmd.file_type
                 ))
             })?;
-        let table = (*factory).create(&state, cmd.location.as_str()).await?;
+        let table = (*factory).create(&state, cmd).await?;
         self.register_table(cmd.name.as_str(), table)?;
         let plan = LogicalPlanBuilder::empty(false).build()?;
         Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 1510e8785..b97afe832 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::ExecutionPlan;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use async_trait::async_trait;
 use datafusion_common::DataFusionError;
-use datafusion_expr::{Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
 
 /// Compares formatted output of a record batch with an expected
 /// vector of strings, with the result of pretty formatting record
@@ -284,11 +284,11 @@ pub struct TestTableFactory {}
 impl TableProviderFactory for TestTableFactory {
     async fn create(
         &self,
-        _state: &SessionState,
-        url: &str,
+        _: &SessionState,
+        cmd: &CreateExternalTable,
     ) -> datafusion_common::Result<Arc<dyn TableProvider>> {
         Ok(Arc::new(TestTableProvider {
-            url: url.to_string(),
+            url: cmd.location.to_string(),
         }))
     }
 }
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 60d9faede..6ad643fd1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -26,7 +26,7 @@ use crate::utils::{
 use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
 use std::fmt::{self, Debug, Display, Formatter};
 use std::hash::{Hash, Hasher};
 use std::sync::Arc;
@@ -1338,6 +1338,8 @@ pub struct CreateExternalTable {
     pub definition: Option<String>,
     /// File compression type (GZIP, BZIP2)
     pub file_compression_type: String,
+    /// Table(provider) specific options
+    pub options: HashMap<String, String>,
 }
 
 /// Produces a relation with string representations of
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index e40734538..de5f3749d 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -176,6 +176,7 @@ message CreateExternalTableNode {
   string delimiter = 8;
   string definition = 9;
   string file_compression_type = 10;
+  map<string, string> options = 11;
 }
 
 message CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 590ce61a2..fe67b590a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -2391,6 +2391,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.file_compression_type.is_empty() {
             len += 1;
         }
+        if !self.options.is_empty() {
+            len += 1;
+        }
         let mut struct_ser = serializer.serialize_struct("datafusion.CreateExternalTableNode", len)?;
         if !self.name.is_empty() {
             struct_ser.serialize_field("name", &self.name)?;
@@ -2422,6 +2425,9 @@ impl serde::Serialize for CreateExternalTableNode {
         if !self.file_compression_type.is_empty() {
             struct_ser.serialize_field("fileCompressionType", &self.file_compression_type)?;
         }
+        if !self.options.is_empty() {
+            struct_ser.serialize_field("options", &self.options)?;
+        }
         struct_ser.end()
     }
 }
@@ -2447,6 +2453,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
             "definition",
             "file_compression_type",
             "fileCompressionType",
+            "options",
         ];
 
         #[allow(clippy::enum_variant_names)]
@@ -2461,6 +2468,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
             Delimiter,
             Definition,
             FileCompressionType,
+            Options,
         }
         impl<'de> serde::Deserialize<'de> for GeneratedField {
             fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -2492,6 +2500,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
                             "delimiter" => Ok(GeneratedField::Delimiter),
                             "definition" => Ok(GeneratedField::Definition),
                             "fileCompressionType" | "file_compression_type" => Ok(GeneratedField::FileCompressionType),
+                            "options" => Ok(GeneratedField::Options),
                             _ => Err(serde::de::Error::unknown_field(value, FIELDS)),
                         }
                     }
@@ -2521,6 +2530,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
                 let mut delimiter__ = None;
                 let mut definition__ = None;
                 let mut file_compression_type__ = None;
+                let mut options__ = None;
                 while let Some(k) = map.next_key()? {
                     match k {
                         GeneratedField::Name => {
@@ -2583,6 +2593,14 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
                             }
                             file_compression_type__ = Some(map.next_value()?);
                         }
+                        GeneratedField::Options => {
+                            if options__.is_some() {
+                                return Err(serde::de::Error::duplicate_field("options"));
+                            }
+                            options__ = Some(
+                                map.next_value::<std::collections::HashMap<_, _>>()?
+                            );
+                        }
                     }
                 }
                 Ok(CreateExternalTableNode {
@@ -2596,6 +2614,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
                     delimiter: delimiter__.unwrap_or_default(),
                     definition: definition__.unwrap_or_default(),
                     file_compression_type: file_compression_type__.unwrap_or_default(),
+                    options: options__.unwrap_or_default(),
                 })
             }
         }
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 09177845a..8962a7a8a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -259,6 +259,8 @@ pub struct CreateExternalTableNode {
     pub definition: ::prost::alloc::string::String,
     #[prost(string, tag="10")]
     pub file_compression_type: ::prost::alloc::string::String,
+    #[prost(map="string, string", tag="11")]
+    pub options: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
 }
 #[derive(Clone, PartialEq, ::prost::Message)]
 pub struct CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 4f71bdac5..ba0ed5eb9 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -585,6 +585,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                     if_not_exists: create_extern_table.if_not_exists,
                     file_compression_type: create_extern_table.file_compression_type.to_string(),
                     definition,
+                    options: create_extern_table.options.clone(),
                 }))
             }
             LogicalPlanType::CreateView(create_view) => {
@@ -1197,6 +1198,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 if_not_exists,
                 definition,
                 file_compression_type,
+                options,
             }) => Ok(protobuf::LogicalPlanNode {
                 logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
                     protobuf::CreateExternalTableNode {
@@ -1210,6 +1212,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                         delimiter: String::from(*delimiter),
                         definition: definition.clone().unwrap_or_default(),
                         file_compression_type: file_compression_type.to_string(),
+                        options: options.clone(),
                     },
                 )),
             }),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index cc9078e2a..fe0c24b46 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -25,6 +25,7 @@ use sqlparser::{
     parser::{Parser, ParserError},
     tokenizer::{Token, Tokenizer},
 };
+use std::collections::HashMap;
 use std::{collections::VecDeque, fmt};
 
 // Use `Parser::expected` instead, if possible
@@ -63,6 +64,8 @@ pub struct CreateExternalTable {
     pub if_not_exists: bool,
     /// File compression type (GZIP, BZIP2)
     pub file_compression_type: String,
+    /// Table(provider) specific options
+    pub options: HashMap<String, String>,
 }
 
 impl fmt::Display for CreateExternalTable {
@@ -348,6 +351,12 @@ impl<'a> DFParser<'a> {
             vec![]
         };
 
+        let options = if self.parse_has_options() {
+            self.parse_options()?
+        } else {
+            HashMap::new()
+        };
+
         self.parser.expect_keyword(Keyword::LOCATION)?;
         let location = self.parser.parse_literal_string()?;
 
@@ -361,6 +370,7 @@ impl<'a> DFParser<'a> {
             table_partition_cols,
             if_not_exists,
             file_compression_type,
+            options,
         };
         Ok(Statement::CreateExternalTable(create))
     }
@@ -381,6 +391,33 @@ impl<'a> DFParser<'a> {
         }
     }
 
+    fn parse_has_options(&mut self) -> bool {
+        self.consume_token(&Token::make_keyword("OPTIONS"))
+    }
+
+    //
+    fn parse_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
+        let mut options: HashMap<String, String> = HashMap::new();
+        self.parser.expect_token(&Token::LParen)?;
+
+        loop {
+            let key = self.parser.parse_literal_string()?;
+            let value = self.parser.parse_literal_string()?;
+            options.insert(key.to_string(), value.to_string());
+            let comma = self.parser.consume_token(&Token::Comma);
+            if self.parser.consume_token(&Token::RParen) {
+                // allow a trailing comma, even though it's not in standard
+                break;
+            } else if !comma {
+                return self.expected(
+                    "',' or ')' after option definition",
+                    self.parser.peek_token(),
+                );
+            }
+        }
+        Ok(options)
+    }
+
     fn consume_token(&mut self, expected: &Token) -> bool {
         let token = self.parser.peek_token().to_string().to_uppercase();
         let token = Token::make_keyword(&token);
@@ -486,6 +523,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -502,6 +540,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -518,6 +557,7 @@ mod tests {
             table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -537,6 +577,7 @@ mod tests {
                 table_partition_cols: vec![],
                 if_not_exists: false,
                 file_compression_type: "".to_string(),
+                options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
         }
@@ -557,6 +598,7 @@ mod tests {
                 table_partition_cols: vec![],
                 if_not_exists: false,
                 file_compression_type: file_compression_type.to_owned(),
+                options: HashMap::new(),
             });
             expect_parse_ok(sql, expected)?;
         }
@@ -573,6 +615,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -588,6 +631,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -603,6 +647,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: false,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -619,6 +664,7 @@ mod tests {
             table_partition_cols: vec![],
             if_not_exists: true,
             file_compression_type: "".to_string(),
+            options: HashMap::new(),
         });
         expect_parse_ok(sql, expected)?;
 
@@ -627,6 +673,48 @@ mod tests {
             "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
         expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");
 
+        // positive case: additional options (one entry) can be specified
+        let sql =
+            "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1') LOCATION 'blahblah'";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![],
+            file_type: "X".to_string(),
+            has_header: false,
+            delimiter: ',',
+            location: "blahblah".into(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            file_compression_type: "".to_string(),
+            options: HashMap::from([("k1".into(), "v1".into())]),
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case: additional options (multiple entries) can be specified
+        let sql =
+            "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2) LOCATION 'blahblah'";
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![],
+            file_type: "X".to_string(),
+            has_header: false,
+            delimiter: ',',
+            location: "blahblah".into(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+            file_compression_type: "".to_string(),
+            options: HashMap::from([
+                ("k1".into(), "v1".into()),
+                ("k2".into(), "v2".into()),
+            ]),
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // Error cases: partition column does not support type
+        let sql =
+            "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2, k3) LOCATION 'blahblah'";
+        expect_parse_error(sql, "sql parser error: Expected literal string, found: )");
+
         Ok(())
     }
 }
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 58b216240..83f0c6ab9 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -494,6 +494,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             table_partition_cols,
             if_not_exists,
             file_compression_type,
+            options,
         } = statement;
 
         // semantic checks
@@ -523,6 +524,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             if_not_exists,
             definition,
             file_compression_type,
+            options,
         }))
     }