You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/08 21:42:29 UTC
[arrow-datafusion] branch master updated: extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 2a834d16a extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)
2a834d16a is described below
commit 2a834d16ab54a1f0ad1b4cf826a0d1421dab9ae3
Author: Tim Van Wassenhove <ti...@timvw.be>
AuthorDate: Tue Nov 8 22:42:23 2022 +0100
extend CreateExternalTable to allow for options. Rework TableProviderFactory to receive entire cmd (#4126)
---
datafusion/core/src/catalog/listing_schema.rs | 26 +++++--
datafusion/core/src/datasource/datasource.rs | 4 +-
.../core/src/datasource/listing_table_factory.rs | 5 +-
datafusion/core/src/execution/context.rs | 2 +-
datafusion/core/src/test_util.rs | 8 +-
datafusion/expr/src/logical_plan/plan.rs | 4 +-
datafusion/proto/proto/datafusion.proto | 1 +
datafusion/proto/src/generated/pbjson.rs | 19 +++++
datafusion/proto/src/generated/prost.rs | 2 +
datafusion/proto/src/logical_plan.rs | 3 +
datafusion/sql/src/parser.rs | 88 ++++++++++++++++++++++
datafusion/sql/src/planner.rs | 2 +
12 files changed, 148 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/catalog/listing_schema.rs b/datafusion/core/src/catalog/listing_schema.rs
index 0a92590f6..bac2724a0 100644
--- a/datafusion/core/src/catalog/listing_schema.rs
+++ b/datafusion/core/src/catalog/listing_schema.rs
@@ -20,7 +20,8 @@ use crate::catalog::schema::SchemaProvider;
use crate::datasource::datasource::TableProviderFactory;
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
-use datafusion_common::{context, DataFusionError};
+use datafusion_common::{DFSchema, DataFusionError};
+use datafusion_expr::CreateExternalTable;
use futures::TryStreamExt;
use itertools::Itertools;
use object_store::ObjectStore;
@@ -108,13 +109,26 @@ impl ListingSchemaProvider {
})?;
if !self.table_exist(table_name) {
let table_url = format!("{}/{}", self.authority, table_path);
+
let provider = self
.factory
- .create(state, table_url.as_str())
- .await
- .map_err(|e| {
- context!(format!("Could not create table for {}", table_url), e)
- })?;
+ .create(
+ state,
+ &CreateExternalTable {
+ schema: Arc::new(DFSchema::empty()),
+ name: table_name.to_string(),
+ location: table_url,
+ file_type: "".to_string(),
+ has_header: false,
+ delimiter: ',',
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ definition: None,
+ file_compression_type: "".to_string(),
+ options: Default::default(),
+ },
+ )
+ .await?;
let _ = self.register_table(table_name.to_string(), provider.clone())?;
}
}
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 873b11bb2..5a2ba4c2e 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -22,7 +22,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use datafusion_common::Statistics;
-use datafusion_expr::LogicalPlan;
+use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
@@ -95,6 +95,6 @@ pub trait TableProviderFactory: Sync + Send {
async fn create(
&self,
ctx: &SessionState,
- url: &str,
+ cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 117ab8fb2..253172738 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -30,6 +30,7 @@ use crate::datasource::listing::{
use crate::datasource::TableProvider;
use crate::execution::context::SessionState;
use async_trait::async_trait;
+use datafusion_expr::CreateExternalTable;
use std::sync::Arc;
/// A `TableProviderFactory` capable of creating new `ListingTable`s
@@ -49,7 +50,7 @@ impl TableProviderFactory for ListingTableFactory {
async fn create(
&self,
state: &SessionState,
- url: &str,
+ cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
let file_extension = self.file_type.get_ext();
@@ -68,7 +69,7 @@ impl TableProviderFactory for ListingTableFactory {
table_partition_cols: vec![],
};
- let table_path = ListingTableUrl::parse(url)?;
+ let table_path = ListingTableUrl::parse(&cmd.location)?;
let resolved_schema = options.infer_schema(state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 553a208b8..8f5210998 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -506,7 +506,7 @@ impl SessionContext {
cmd.file_type
))
})?;
- let table = (*factory).create(&state, cmd.location.as_str()).await?;
+ let table = (*factory).create(&state, cmd).await?;
self.register_table(cmd.name.as_str(), table)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
diff --git a/datafusion/core/src/test_util.rs b/datafusion/core/src/test_util.rs
index 1510e8785..b97afe832 100644
--- a/datafusion/core/src/test_util.rs
+++ b/datafusion/core/src/test_util.rs
@@ -29,7 +29,7 @@ use crate::physical_plan::ExecutionPlan;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use async_trait::async_trait;
use datafusion_common::DataFusionError;
-use datafusion_expr::{Expr, TableType};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
/// Compares formatted output of a record batch with an expected
/// vector of strings, with the result of pretty formatting record
@@ -284,11 +284,11 @@ pub struct TestTableFactory {}
impl TableProviderFactory for TestTableFactory {
async fn create(
&self,
- _state: &SessionState,
- url: &str,
+ _: &SessionState,
+ cmd: &CreateExternalTable,
) -> datafusion_common::Result<Arc<dyn TableProvider>> {
Ok(Arc::new(TestTableProvider {
- url: url.to_string(),
+ url: cmd.location.to_string(),
}))
}
}
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 60d9faede..6ad643fd1 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -26,7 +26,7 @@ use crate::utils::{
use crate::{Expr, ExprSchemable, TableProviderFilterPushDown, TableSource};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{plan_err, Column, DFSchema, DFSchemaRef, DataFusionError};
-use std::collections::HashSet;
+use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
use std::hash::{Hash, Hasher};
use std::sync::Arc;
@@ -1338,6 +1338,8 @@ pub struct CreateExternalTable {
pub definition: Option<String>,
/// File compression type (GZIP, BZIP2)
pub file_compression_type: String,
+ /// Table(provider) specific options
+ pub options: HashMap<String, String>,
}
/// Produces a relation with string representations of
diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto
index e40734538..de5f3749d 100644
--- a/datafusion/proto/proto/datafusion.proto
+++ b/datafusion/proto/proto/datafusion.proto
@@ -176,6 +176,7 @@ message CreateExternalTableNode {
string delimiter = 8;
string definition = 9;
string file_compression_type = 10;
+ map<string, string> options = 11;
}
message CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs
index 590ce61a2..fe67b590a 100644
--- a/datafusion/proto/src/generated/pbjson.rs
+++ b/datafusion/proto/src/generated/pbjson.rs
@@ -2391,6 +2391,9 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.file_compression_type.is_empty() {
len += 1;
}
+ if !self.options.is_empty() {
+ len += 1;
+ }
let mut struct_ser = serializer.serialize_struct("datafusion.CreateExternalTableNode", len)?;
if !self.name.is_empty() {
struct_ser.serialize_field("name", &self.name)?;
@@ -2422,6 +2425,9 @@ impl serde::Serialize for CreateExternalTableNode {
if !self.file_compression_type.is_empty() {
struct_ser.serialize_field("fileCompressionType", &self.file_compression_type)?;
}
+ if !self.options.is_empty() {
+ struct_ser.serialize_field("options", &self.options)?;
+ }
struct_ser.end()
}
}
@@ -2447,6 +2453,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
"definition",
"file_compression_type",
"fileCompressionType",
+ "options",
];
#[allow(clippy::enum_variant_names)]
@@ -2461,6 +2468,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
Delimiter,
Definition,
FileCompressionType,
+ Options,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
@@ -2492,6 +2500,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
"delimiter" => Ok(GeneratedField::Delimiter),
"definition" => Ok(GeneratedField::Definition),
"fileCompressionType" | "file_compression_type" => Ok(GeneratedField::FileCompressionType),
+ "options" => Ok(GeneratedField::Options),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
@@ -2521,6 +2530,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
let mut delimiter__ = None;
let mut definition__ = None;
let mut file_compression_type__ = None;
+ let mut options__ = None;
while let Some(k) = map.next_key()? {
match k {
GeneratedField::Name => {
@@ -2583,6 +2593,14 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
}
file_compression_type__ = Some(map.next_value()?);
}
+ GeneratedField::Options => {
+ if options__.is_some() {
+ return Err(serde::de::Error::duplicate_field("options"));
+ }
+ options__ = Some(
+ map.next_value::<std::collections::HashMap<_, _>>()?
+ );
+ }
}
}
Ok(CreateExternalTableNode {
@@ -2596,6 +2614,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode {
delimiter: delimiter__.unwrap_or_default(),
definition: definition__.unwrap_or_default(),
file_compression_type: file_compression_type__.unwrap_or_default(),
+ options: options__.unwrap_or_default(),
})
}
}
diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs
index 09177845a..8962a7a8a 100644
--- a/datafusion/proto/src/generated/prost.rs
+++ b/datafusion/proto/src/generated/prost.rs
@@ -259,6 +259,8 @@ pub struct CreateExternalTableNode {
pub definition: ::prost::alloc::string::String,
#[prost(string, tag="10")]
pub file_compression_type: ::prost::alloc::string::String,
+ #[prost(map="string, string", tag="11")]
+ pub options: ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct CreateCatalogSchemaNode {
diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs
index 4f71bdac5..ba0ed5eb9 100644
--- a/datafusion/proto/src/logical_plan.rs
+++ b/datafusion/proto/src/logical_plan.rs
@@ -585,6 +585,7 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists: create_extern_table.if_not_exists,
file_compression_type: create_extern_table.file_compression_type.to_string(),
definition,
+ options: create_extern_table.options.clone(),
}))
}
LogicalPlanType::CreateView(create_view) => {
@@ -1197,6 +1198,7 @@ impl AsLogicalPlan for LogicalPlanNode {
if_not_exists,
definition,
file_compression_type,
+ options,
}) => Ok(protobuf::LogicalPlanNode {
logical_plan_type: Some(LogicalPlanType::CreateExternalTable(
protobuf::CreateExternalTableNode {
@@ -1210,6 +1212,7 @@ impl AsLogicalPlan for LogicalPlanNode {
delimiter: String::from(*delimiter),
definition: definition.clone().unwrap_or_default(),
file_compression_type: file_compression_type.to_string(),
+ options: options.clone(),
},
)),
}),
diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs
index cc9078e2a..fe0c24b46 100644
--- a/datafusion/sql/src/parser.rs
+++ b/datafusion/sql/src/parser.rs
@@ -25,6 +25,7 @@ use sqlparser::{
parser::{Parser, ParserError},
tokenizer::{Token, Tokenizer},
};
+use std::collections::HashMap;
use std::{collections::VecDeque, fmt};
// Use `Parser::expected` instead, if possible
@@ -63,6 +64,8 @@ pub struct CreateExternalTable {
pub if_not_exists: bool,
/// File compression type (GZIP, BZIP2)
pub file_compression_type: String,
+ /// Table(provider) specific options
+ pub options: HashMap<String, String>,
}
impl fmt::Display for CreateExternalTable {
@@ -348,6 +351,12 @@ impl<'a> DFParser<'a> {
vec![]
};
+ let options = if self.parse_has_options() {
+ self.parse_options()?
+ } else {
+ HashMap::new()
+ };
+
self.parser.expect_keyword(Keyword::LOCATION)?;
let location = self.parser.parse_literal_string()?;
@@ -361,6 +370,7 @@ impl<'a> DFParser<'a> {
table_partition_cols,
if_not_exists,
file_compression_type,
+ options,
};
Ok(Statement::CreateExternalTable(create))
}
@@ -381,6 +391,33 @@ impl<'a> DFParser<'a> {
}
}
+ fn parse_has_options(&mut self) -> bool {
+ self.consume_token(&Token::make_keyword("OPTIONS"))
+ }
+
+ //
+ fn parse_options(&mut self) -> Result<HashMap<String, String>, ParserError> {
+ let mut options: HashMap<String, String> = HashMap::new();
+ self.parser.expect_token(&Token::LParen)?;
+
+ loop {
+ let key = self.parser.parse_literal_string()?;
+ let value = self.parser.parse_literal_string()?;
+ options.insert(key.to_string(), value.to_string());
+ let comma = self.parser.consume_token(&Token::Comma);
+ if self.parser.consume_token(&Token::RParen) {
+ // allow a trailing comma, even though it's not in standard
+ break;
+ } else if !comma {
+ return self.expected(
+ "',' or ')' after option definition",
+ self.parser.peek_token(),
+ );
+ }
+ }
+ Ok(options)
+ }
+
fn consume_token(&mut self, expected: &Token) -> bool {
let token = self.parser.peek_token().to_string().to_uppercase();
let token = Token::make_keyword(&token);
@@ -486,6 +523,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -502,6 +540,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -518,6 +557,7 @@ mod tests {
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -537,6 +577,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
}
@@ -557,6 +598,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: file_compression_type.to_owned(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
}
@@ -573,6 +615,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -588,6 +631,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -603,6 +647,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: false,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -619,6 +664,7 @@ mod tests {
table_partition_cols: vec![],
if_not_exists: true,
file_compression_type: "".to_string(),
+ options: HashMap::new(),
});
expect_parse_ok(sql, expected)?;
@@ -627,6 +673,48 @@ mod tests {
"CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
expect_parse_error(sql, "sql parser error: Expected ',' or ')' after partition definition, found: int");
+ // positive case: additional options (one entry) can be specified
+ let sql =
+ "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1') LOCATION 'blahblah'";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![],
+ file_type: "X".to_string(),
+ has_header: false,
+ delimiter: ',',
+ location: "blahblah".into(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ file_compression_type: "".to_string(),
+ options: HashMap::from([("k1".into(), "v1".into())]),
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case: additional options (multiple entries) can be specified
+ let sql =
+ "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2) LOCATION 'blahblah'";
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![],
+ file_type: "X".to_string(),
+ has_header: false,
+ delimiter: ',',
+ location: "blahblah".into(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ file_compression_type: "".to_string(),
+ options: HashMap::from([
+ ("k1".into(), "v1".into()),
+ ("k2".into(), "v2".into()),
+ ]),
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // Error cases: partition column does not support type
+ let sql =
+ "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2, k3) LOCATION 'blahblah'";
+ expect_parse_error(sql, "sql parser error: Expected literal string, found: )");
+
Ok(())
}
}
diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs
index 58b216240..83f0c6ab9 100644
--- a/datafusion/sql/src/planner.rs
+++ b/datafusion/sql/src/planner.rs
@@ -494,6 +494,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
table_partition_cols,
if_not_exists,
file_compression_type,
+ options,
} = statement;
// semantic checks
@@ -523,6 +524,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
if_not_exists,
definition,
file_compression_type,
+ options,
}))
}