You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/26 12:13:10 UTC

[arrow-datafusion] branch master updated: Support querying CSV files without providing the schema (#1050)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new d0fd9d2  Support querying CSV files without providing the schema (#1050)
d0fd9d2 is described below

commit d0fd9d2e7b116b084bf6e168d9f1f745397a6a26
Author: carlos <wx...@gmail.com>
AuthorDate: Sun Sep 26 20:12:37 2021 +0800

    Support querying CSV files without providing the schema (#1050)
---
 datafusion/src/execution/context.rs   | 13 ++++-----
 datafusion/src/sql/planner.rs         | 15 ++--------
 datafusion/tests/user_defined_plan.rs | 53 +++++++++++++++++++++++++++++++++++
 3 files changed, 62 insertions(+), 19 deletions(-)

diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index adc53a0..08effe7 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -186,13 +186,12 @@ impl ExecutionContext {
                 ref has_header,
             } => match file_type {
                 FileType::CSV => {
-                    self.register_csv(
-                        name,
-                        location,
-                        CsvReadOptions::new()
-                            .schema(&schema.as_ref().to_owned().into())
-                            .has_header(*has_header),
-                    )?;
+                    let mut options = CsvReadOptions::new().has_header(*has_header);
+                    let tmp_schema = schema.as_ref().to_owned().into();
+                    if !schema.fields().is_empty() {
+                        options = options.schema(&tmp_schema);
+                    }
+                    self.register_csv(name, location, options)?;
                     let plan = LogicalPlanBuilder::empty(false).build()?;
                     Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
                 }
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index d969a1f..140ae26 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -197,13 +197,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
 
         // semantic checks
         match *file_type {
-            FileType::CSV => {
-                if columns.is_empty() {
-                    return Err(DataFusionError::Plan(
-                        "Column definitions required for CSV files. None found".into(),
-                    ));
-                }
-            }
+            FileType::CSV => {}
             FileType::Parquet => {
                 if !columns.is_empty() {
                     return Err(DataFusionError::Plan(
@@ -2901,11 +2895,8 @@ mod tests {
     #[test]
     fn create_external_table_csv_no_schema() {
         let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";
-        let err = logical_plan(sql).expect_err("query should have failed");
-        assert_eq!(
-            "Plan(\"Column definitions required for CSV files. None found\")",
-            format!("{:?}", err)
-        );
+        let expected = "CreateExternalTable: \"t\"";
+        quick_test(sql, expected);
     }
 
     #[test]
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index 14600f2..27ad901 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -109,6 +109,22 @@ async fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> {
     Ok(ctx)
 }
 
+async fn setup_table_without_schemas(
+    mut ctx: ExecutionContext,
+) -> Result<ExecutionContext> {
+    let sql = "CREATE EXTERNAL TABLE sales STORED AS CSV location 'tests/customer.csv'";
+
+    let expected = vec!["++", "++"];
+
+    let s = exec_sql(&mut ctx, sql).await?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(expected, actual, "Creating table");
+    Ok(ctx)
+}
+
+const QUERY1: &str = "SELECT * FROM sales limit 3";
+
 const QUERY: &str =
     "SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
 
@@ -142,6 +158,43 @@ async fn run_and_compare_query(
     Ok(())
 }
 
+// Run the query using the specified execution context and compare it
+// to the known result
+async fn run_and_compare_query_with_auto_schemas(
+    mut ctx: ExecutionContext,
+    description: &str,
+) -> Result<()> {
+    let expected = vec![
+        "+----------+----------+",
+        "| column_1 | column_2 |",
+        "+----------+----------+",
+        "| andrew   | 100      |",
+        "| jorge    | 200      |",
+        "| andy     | 150      |",
+        "+----------+----------+",
+    ];
+
+    let s = exec_sql(&mut ctx, QUERY1).await?;
+    let actual = s.lines().collect::<Vec<_>>();
+
+    assert_eq!(
+        expected,
+        actual,
+        "output mismatch for {}. Expectedn\n{}Actual:\n{}",
+        description,
+        expected.join("\n"),
+        s
+    );
+    Ok(())
+}
+
+#[tokio::test]
+// Run the query using default planners and optimizer
+async fn normal_query_without_schemas() -> Result<()> {
+    let ctx = setup_table_without_schemas(ExecutionContext::new()).await?;
+    run_and_compare_query_with_auto_schemas(ctx, "Default context").await
+}
+
 #[tokio::test]
 // Run the query using default planners and optimizer
 async fn normal_query() -> Result<()> {