You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/06 18:07:26 UTC

[arrow-datafusion] branch master updated: Add delimiter for create external table (#2162)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c4ffd4f7 Add delimiter for create external table (#2162)
0c4ffd4f7 is described below

commit 0c4ffd4f7b064a8f4e4a167e2e4c913389056968
Author: Matthew Turner <ma...@outlook.com>
AuthorDate: Wed Apr 6 14:07:20 2022 -0400

    Add delimiter for create external table (#2162)
    
    * Add delimiter for create external table
    
    * Clippy
    
    * Rebase
---
 ballista/rust/client/src/context.rs              |  3 +-
 ballista/rust/core/proto/ballista.proto          |  1 +
 ballista/rust/core/src/serde/logical_plan/mod.rs |  6 ++++
 datafusion/core/src/execution/context.rs         |  8 +++--
 datafusion/core/src/logical_plan/plan.rs         |  2 ++
 datafusion/core/src/sql/parser.rs                | 45 ++++++++++++++++++++++++
 datafusion/core/src/sql/planner.rs               |  2 ++
 datafusion/core/tests/aggregate_simple_pipe.csv  | 16 +++++++++
 datafusion/core/tests/sql/create_drop.rs         | 23 ++++++++++++
 9 files changed, 103 insertions(+), 3 deletions(-)

diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 451eac25a..3a8a68505 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -375,6 +375,7 @@ impl BallistaContext {
                 ref location,
                 ref file_type,
                 ref has_header,
+                ref delimiter,
                 ref table_partition_cols,
                 ref if_not_exists,
             }) => {
@@ -389,6 +390,7 @@ impl BallistaContext {
                                 CsvReadOptions::new()
                                     .schema(&schema.as_ref().to_owned().into())
                                     .has_header(*has_header)
+                                    .delimiter(*delimiter as u8)
                                     .table_partition_cols(table_partition_cols.to_vec()),
                             )
                             .await?;
@@ -428,7 +430,6 @@ impl BallistaContext {
                     ))),
                 }
             }
-
             _ => ctx.sql(sql).await,
         }
     }
diff --git a/ballista/rust/core/proto/ballista.proto b/ballista/rust/core/proto/ballista.proto
index b4c63b04b..ae938c6a7 100644
--- a/ballista/rust/core/proto/ballista.proto
+++ b/ballista/rust/core/proto/ballista.proto
@@ -149,6 +149,7 @@ message CreateExternalTableNode {
   datafusion.DfSchema schema = 5;
   repeated string table_partition_cols = 6;
   bool if_not_exists = 7;
+  string delimiter = 8;
 }
 
 message CreateCatalogSchemaNode {
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index c84f6efa6..57cb35dec 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -326,6 +326,9 @@ impl AsLogicalPlan for LogicalPlanNode {
                     location: create_extern_table.location.clone(),
                     file_type: pb_file_type.into(),
                     has_header: create_extern_table.has_header,
+                    delimiter: create_extern_table.delimiter.chars().next().ok_or_else(|| {
+                        BallistaError::General(String::from("Protobuf deserialization error, unable to parse CSV delimiter"))
+                    })?,
                     table_partition_cols: create_extern_table
                         .table_partition_cols
                         .clone(),
@@ -788,6 +791,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                 location,
                 file_type,
                 has_header,
+                delimiter,
                 schema: df_schema,
                 table_partition_cols,
                 if_not_exists,
@@ -811,6 +815,7 @@ impl AsLogicalPlan for LogicalPlanNode {
                             schema: Some(df_schema.into()),
                             table_partition_cols: table_partition_cols.clone(),
                             if_not_exists: *if_not_exists,
+                            delimiter: String::from(*delimiter),
                         },
                     )),
                 })
@@ -1158,6 +1163,7 @@ mod roundtrip_tests {
                     location: String::from("employee.csv"),
                     file_type: *file,
                     has_header: true,
+                    delimiter: ',',
                     table_partition_cols: vec![],
                     if_not_exists: false,
                 });
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 2ec133ce5..85022019b 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -217,13 +217,17 @@ impl SessionContext {
                 ref location,
                 ref file_type,
                 ref has_header,
+                ref delimiter,
                 ref table_partition_cols,
                 ref if_not_exists,
             }) => {
                 let (file_format, file_extension) = match file_type {
                     FileType::CSV => (
-                        Arc::new(CsvFormat::default().with_has_header(*has_header))
-                            as Arc<dyn FileFormat>,
+                        Arc::new(
+                            CsvFormat::default()
+                                .with_has_header(*has_header)
+                                .with_delimiter(*delimiter as u8),
+                        ) as Arc<dyn FileFormat>,
                         DEFAULT_CSV_EXTENSION,
                     ),
                     FileType::Parquet => (
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 88d2af070..88e0a185b 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -186,6 +186,8 @@ pub struct CreateExternalTable {
     pub file_type: FileType,
     /// Whether the CSV file contains a header
     pub has_header: bool,
+    /// Delimiter for CSV
+    pub delimiter: char,
     /// Partition Columns
     pub table_partition_cols: Vec<String>,
     /// Option to not error if table already exists
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 29ad9f8cd..2b98a3090 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -76,6 +76,8 @@ pub struct CreateExternalTable {
     pub file_type: FileType,
     /// CSV Header row?
     pub has_header: bool,
+    /// User defined delimiter for CSVs
+    pub delimiter: char,
     /// Path to file
     pub location: String,
     /// Partition Columns
@@ -313,6 +315,12 @@ impl<'a> DFParser<'a> {
 
         let has_header = self.parse_csv_has_header();
 
+        let has_delimiter = self.parse_has_delimiter();
+        let delimiter = match has_delimiter {
+            true => self.parse_delimiter()?,
+            false => ',',
+        };
+
         let table_partition_cols = if self.parse_has_partition() {
             self.parse_partitions()?
         } else {
@@ -327,6 +335,7 @@ impl<'a> DFParser<'a> {
             columns,
             file_type,
             has_header,
+            delimiter,
             location,
             table_partition_cols,
             if_not_exists,
@@ -359,6 +368,20 @@ impl<'a> DFParser<'a> {
             & self.consume_token(&Token::make_keyword("ROW"))
     }
 
+    fn parse_has_delimiter(&mut self) -> bool {
+        self.consume_token(&Token::make_keyword("DELIMITER"))
+    }
+
+    fn parse_delimiter(&mut self) -> Result<char, ParserError> {
+        let token = self.parser.parse_literal_string()?;
+        match token.len() {
+            1 => Ok(token.chars().next().unwrap()),
+            _ => Err(ParserError::TokenizerError(
+                "Delimiter must be a single char".to_string(),
+            )),
+        }
+    }
+
     fn parse_has_partition(&mut self) -> bool {
         self.consume_token(&Token::make_keyword("PARTITIONED"))
             & self.consume_token(&Token::make_keyword("BY"))
@@ -424,6 +447,22 @@ mod tests {
             columns: vec![make_column_def("c1", DataType::Int(display))],
             file_type: FileType::CSV,
             has_header: false,
+            delimiter: ',',
+            location: "foo.csv".into(),
+            table_partition_cols: vec![],
+            if_not_exists: false,
+        });
+        expect_parse_ok(sql, expected)?;
+
+        // positive case with delimiter
+        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV DELIMITER '|' LOCATION 'foo.csv'";
+        let display = None;
+        let expected = Statement::CreateExternalTable(CreateExternalTable {
+            name: "t".into(),
+            columns: vec![make_column_def("c1", DataType::Int(display))],
+            file_type: FileType::CSV,
+            has_header: false,
+            delimiter: '|',
             location: "foo.csv".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
@@ -438,6 +477,7 @@ mod tests {
             columns: vec![make_column_def("c1", DataType::Int(display))],
             file_type: FileType::CSV,
             has_header: false,
+            delimiter: ',',
             location: "foo.csv".into(),
             table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
             if_not_exists: false,
@@ -455,6 +495,7 @@ mod tests {
                 columns: vec![make_column_def("c1", DataType::Int(display))],
                 file_type: FileType::CSV,
                 has_header: true,
+                delimiter: ',',
                 location: "foo.csv".into(),
                 table_partition_cols: vec![],
                 if_not_exists: false,
@@ -469,6 +510,7 @@ mod tests {
             columns: vec![],
             file_type: FileType::Parquet,
             has_header: false,
+            delimiter: ',',
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
@@ -482,6 +524,7 @@ mod tests {
             columns: vec![],
             file_type: FileType::Parquet,
             has_header: false,
+            delimiter: ',',
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
@@ -495,6 +538,7 @@ mod tests {
             columns: vec![],
             file_type: FileType::Avro,
             has_header: false,
+            delimiter: ',',
             location: "foo.avro".into(),
             table_partition_cols: vec![],
             if_not_exists: false,
@@ -509,6 +553,7 @@ mod tests {
             columns: vec![],
             file_type: FileType::Parquet,
             has_header: false,
+            delimiter: ',',
             location: "foo.parquet".into(),
             table_partition_cols: vec![],
             if_not_exists: true,
diff --git a/datafusion/core/src/sql/planner.rs b/datafusion/core/src/sql/planner.rs
index 153c405d6..3598492e7 100644
--- a/datafusion/core/src/sql/planner.rs
+++ b/datafusion/core/src/sql/planner.rs
@@ -318,6 +318,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             columns,
             file_type,
             has_header,
+            delimiter,
             location,
             table_partition_cols,
             if_not_exists,
@@ -346,6 +347,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
             location,
             file_type,
             has_header,
+            delimiter,
             table_partition_cols,
             if_not_exists,
         }))
diff --git a/datafusion/core/tests/aggregate_simple_pipe.csv b/datafusion/core/tests/aggregate_simple_pipe.csv
new file mode 100644
index 000000000..edf05596f
--- /dev/null
+++ b/datafusion/core/tests/aggregate_simple_pipe.csv
@@ -0,0 +1,16 @@
+c1|c2|c3
+0.00001|0.000000000001|true
+0.00002|0.000000000002|false
+0.00002|0.000000000002|false
+0.00003|0.000000000003|true
+0.00003|0.000000000003|true
+0.00003|0.000000000003|true
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00004|0.000000000004|false
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
+0.00005|0.000000000005|true
\ No newline at end of file
diff --git a/datafusion/core/tests/sql/create_drop.rs b/datafusion/core/tests/sql/create_drop.rs
index 6b1769751..59df5d404 100644
--- a/datafusion/core/tests/sql/create_drop.rs
+++ b/datafusion/core/tests/sql/create_drop.rs
@@ -191,3 +191,26 @@ async fn sql_create_table_if_not_exists() -> Result<()> {
 
     Ok(())
 }
+
+#[tokio::test]
+async fn create_pipe_delimited_csv_table() -> Result<()> {
+    let ctx = SessionContext::new();
+
+    let sql = "CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV WITH HEADER ROW DELIMITER '|' LOCATION 'tests/aggregate_simple_pipe.csv'";
+    ctx.sql(sql).await.unwrap();
+
+    let sql_all = "SELECT * FROM aggregate_simple order by c1 LIMIT 1";
+    let results_all = execute_to_batches(&ctx, sql_all).await;
+
+    let expected = vec![
+        "+---------+----------------+------+",
+        "| c1      | c2             | c3   |",
+        "+---------+----------------+------+",
+        "| 0.00001 | 0.000000000001 | true |",
+        "+---------+----------------+------+",
+    ];
+
+    assert_batches_eq!(expected, &results_all);
+
+    Ok(())
+}