You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/06 18:07:26 UTC
[arrow-datafusion] branch master updated: Add delimiter for create external table (#2162)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 0c4ffd4f7 Add delimiter for create external table (#2162)
0c4ffd4f7 is described below
commit 0c4ffd4f7b064a8f4e4a167e2e4c913389056968
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Wed Apr 6 14:07:20 2022 -0400
Add delimiter for create external table (#2162)
* Add delimiter for create external table
* Clippy
* Rebase
---
ballista/rust/client/src/context.rs | 3 +-
ballista/rust/core/proto/ballista.proto | 1 +
ballista/rust/core/src/serde/logical_plan/mod.rs | 6 ++++
datafusion/core/src/execution/context.rs | 8 +++--
datafusion/core/src/logical_plan/plan.rs | 2 ++
datafusion/core/src/sql/parser.rs | 45 ++++++++++++++++++++++++
datafusion/core/src/sql/planner.rs | 2 ++
datafusion/core/tests/aggregate_simple_pipe.csv | 16 +++++++++
datafusion/core/tests/sql/create_drop.rs | 23 ++++++++++++
9 files changed, 103 insertions(+), 3 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 451eac25a..3a8a68505 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -375,6 +375,7 @@ impl BallistaContext {
ref location,
ref file_type,
ref has_header,
+ ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
@@ -389,6 +390,7 @@ impl BallistaContext {
CsvReadOptions::new()
.schema(&schema.as_ref().to_owned().into())
.has_header(*has_header)
+ .delimiter(*delimiter as u8)
.table_partition_cols(table_partition_cols.to_vec()),
)
.await?;
@@ -428,7 +430,6 @@ impl BallistaContext {
))),
}
}
-
_ => ctx.sql(sql).await,
}
}
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index b4c63b04b..ae938c6a7 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -149,6 +149,7 @@ message CreateExternalTableNode {
datafusion.DfSchema schema = 5;
repeated string table_partition_cols = 6;
bool if_not_exists = 7;
+ string delimiter = 8;
}
message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index c84f6efa6..57cb35dec 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -326,6 +326,9 @@ impl AsLogicalPlan for LogicalPlanNode {
location: create_extern_table.location.clone(),
file_type: pb_file_type.into(),
has_header: create_extern_table.has_header,
+ delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
+ BallistaError::General(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
+ })?,
table_partition_cols: create_extern_table
.table_partition_cols
.clone(),
@@ -788,6 +791,7 @@ impl AsLogicalPlan for LogicalPlanNode {
location,
file_type,
has_header,
+ delimiter,
schema: df_schema,
table_partition_cols,
if_not_exists,
@@ -811,6 +815,7 @@ impl AsLogicalPlan for LogicalPlanNode {
schema: Some(df_schema.into()),
table_partition_cols: table_partition_cols.clone(),
if_not_exists: *if_not_exists,
+ delimiter: String::from(*delimiter),
},
)),
})
@@ -1158,6 +1163,7 @@ mod roundtrip_tests {
location: String::from("employee.csv"),
file_type: *file,
has_header: true,
+ delimiter: ',',
table_partition_cols: vec![],
if_not_exists: false,
});
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 2ec133ce5..85022019b 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -217,13 +217,17 @@ impl SessionContext {
ref location,
ref file_type,
ref has_header,
+ ref delimiter,
ref table_partition_cols,
ref if_not_exists,
}) => {
let (file_format, file_extension) = match file_type {
FileType::CSV => (
- Arc::new(CsvFormat::default().with_has_header(*has_header))
- as Arc<dyn FileFormat>,
+ Arc::new(
+ CsvFormat::default()
+ .with_has_header(*has_header)
+ .with_delimiter(*delimiter as u8),
+ ) as Arc<dyn FileFormat>,
DEFAULT_CSV_EXTENSION,
),
FileType::Parquet => (
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 88d2af070..88e0a185b 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -186,6 +186,8 @@ pub struct CreateExternalTable {
pub file_type: FileType,
/// Whether the CSV file contains a header
pub has_header: bool,
+ /// Delimiter for CSV
+ pub delimiter: char,
/// Partition Columns
pub table_partition_cols: Vec<String>,
/// Option to not error if table already exists
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 29ad9f8cd..2b98a3090 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -76,6 +76,8 @@ pub struct CreateExternalTable {
pub file_type: FileType,
/// CSV Header row?
pub has_header: bool,
+ /// User defined delimiter for CSVs
+ pub delimiter: char,
/// Path to file
pub location: String,
/// Partition Columns
@@ -313,6 +315,12 @@ impl<'a> DFParser<'a> {
let has_header = self.parse_csv_has_header();
+ let has_delimiter = self.parse_has_delimiter();
+ let delimiter = match has_delimiter {
+ true => self.parse_delimiter()?,
+ false => ',',
+ };
+
let table_partition_cols = if self.parse_has_partition() {
self.parse_partitions()?
} else {
@@ -327,6 +335,7 @@ impl<'a> DFParser<'a> {
columns,
file_type,
has_header,
+ delimiter,
location,
table_partition_cols,
if_not_exists,
@@ -359,6 +368,20 @@ impl<'a> DFParser<'a> {
& self.consume_token(&Token::make_keyword("ROW"))
}
+ fn parse_has_delimiter(&mut self) -> bool {
+ self.consume_token(&Token::make_keyword("DELIMITER"))
+ }
+
+ fn parse_delimiter(&mut self) -> Result<char, ParserError> {
+ let token = self.parser.parse_literal_string()?;
+ match token.len() {
+ 1 => Ok(token.chars().next().unwrap()),
+ _ => Err(ParserError::TokenizerError(
+ "Delimiter must be a single char".to_string(),
+ )),
+ }
+ }
+
fn parse_has_partition(&mut self) -> bool {
self.consume_token(&Token::make_keyword("PARTITIONED"))
& self.consume_token(&Token::make_keyword("BY"))
@@ -424,6 +447,22 @@ mod tests {
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
+ delimiter: ',',
+ location: "foo.csv".into(),
+ table_partition_cols: vec![],
+ if_not_exists: false,
+ });
+ expect_parse_ok(sql, expected)?;
+
+ // positive case with delimiter
+ let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV DELIMITER '|' LOCATION 'foo.csv'";
+ let display = None;
+ let expected = Statement::CreateExternalTable(CreateExternalTable {
+ name: "t".into(),
+ columns: vec![make_column_def("c1", DataType::Int(display))],
+ file_type: FileType::CSV,
+ has_header: false,
+ delimiter: '|',
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
@@ -438,6 +477,7 @@ mod tests {
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: false,
+ delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
if_not_exists: false,
@@ -455,6 +495,7 @@ mod tests {
columns: vec![make_column_def("c1", DataType::Int(display))],
file_type: FileType::CSV,
has_header: true,
+ delimiter: ',',
location: "foo.csv".into(),
table_partition_cols: vec![],
if_not_exists: false,
@@ -469,6 +510,7 @@ mod tests {
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
+ delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
@@ -482,6 +524,7 @@ mod tests {
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
+ delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: false,
@@ -495,6 +538,7 @@ mod tests {
columns: vec![],
file_type: FileType::Avro,
has_header: false,
+ delimiter: ',',
location: "foo.avro".into(),
table_partition_cols: vec![],
if_not_exists: false,
@@ -509,6 +553,7 @@ mod tests {
columns: vec![],
file_type: FileType::Parquet,
has_header: false,
+ delimiter: ',',
location: "foo.parquet".into(),
table_partition_cols: vec![],
if_not_exists: true,
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 153c405d6..3598492e7 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -318,6 +318,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
columns,
file_type,
has_header,
+ delimiter,
location,
table_partition_cols,
if_not_exists,
@@ -346,6 +347,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
location,
file_type,
has_header,
+ delimiter,
table_partition_cols,
if_not_exists,
}))
diff --git a/datafusion/core/tests/aggregate_simple_pipe.csv b/datafusion/core/tests/aggregate_simple_pipe.csv
new file mode 100644
index 000000000..edf05596f
--- /dev/null
+++ b/datafusion/core/tests/aggregate_simple_pipe.csv
@@ -0,0 +1,16 @@
+c1|c2|c3
+0.00001|0.000000000001|true
+0.00002|0.000000000002|false
+0.00002|0.000000000002|false
+0.00003|0.000000000003|true
+0.00003|0.000000000003|true
+0.00003|0.000000000003|true
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
\ No newline at end of file
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 6b1769751..59df5d404 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -191,3 +191,26 @@ async fn sql_create_table_if_not_exists() -> Result<()> {
Ok(())
}
+
+#[tokio::test]
+async fn create_pipe_delimited_csv_table() -> Result<()> {
+ let ctx = SessionContext::new();
+
+ let sql = "CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW DELIMITER '|' LOCATION 'tests/aggregate_simple_pipe.csv'";
+ ctx.sql(sql).await.unwrap();
+
+ let sql_all = "SELECT * FROM aggregate_simple order by c1 LIMIT 1";
+ let results_all = execute_to_batches(&ctx, sql_all).await;
+
+ let expected = vec![
+ "+---------+----------------+------+",
+ "| c1 | c2 | c3 |",
+ "+---------+----------------+------+",
+ "| 0.00001 | 0.000000000001 | true |",
+ "+---------+----------------+------+",
+ ];
+
+ assert_batches_eq!(expected, &results_all);
+
+ Ok(())
+}