You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/09/26 12:13:10 UTC
[arrow-datafusion] branch master updated: Support querying CSV
files without providing the schema (#1050)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new d0fd9d2 Support querying CSV files without providing the schema (#1050)
d0fd9d2 is described below
commit d0fd9d2e7b116b084bf6e168d9f1f745397a6a26
Author: carlos <wx...@gmail.com>
AuthorDate: Sun Sep 26 20:12:37 2021 +0800
Support querying CSV files without providing the schema (#1050)
---
datafusion/src/execution/context.rs | 13 ++++-----
datafusion/src/sql/planner.rs | 15 ++--------
datafusion/tests/user_defined_plan.rs | 53 +++++++++++++++++++++++++++++++++++
3 files changed, 62 insertions(+), 19 deletions(-)
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index adc53a0..08effe7 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -186,13 +186,12 @@ impl ExecutionContext {
ref has_header,
} => match file_type {
FileType::CSV => {
- self.register_csv(
- name,
- location,
- CsvReadOptions::new()
- .schema(&schema.as_ref().to_owned().into())
- .has_header(*has_header),
- )?;
+ let mut options = CsvReadOptions::new().has_header(*has_header);
+ let tmp_schema = schema.as_ref().to_owned().into();
+ if !schema.fields().is_empty() {
+ options = options.schema(&tmp_schema);
+ }
+ self.register_csv(name, location, options)?;
let plan = LogicalPlanBuilder::empty(false).build()?;
Ok(Arc::new(DataFrameImpl::new(self.state.clone(), &plan)))
}
diff --git a/datafusion/src/sql/planner.rs b/datafusion/src/sql/planner.rs
index d969a1f..140ae26 100644
--- a/datafusion/src/sql/planner.rs
+++ b/datafusion/src/sql/planner.rs
@@ -197,13 +197,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
// semantic checks
match *file_type {
- FileType::CSV => {
- if columns.is_empty() {
- return Err(DataFusionError::Plan(
- "Column definitions required for CSV files. None found".into(),
- ));
- }
- }
+ FileType::CSV => {}
FileType::Parquet => {
if !columns.is_empty() {
return Err(DataFusionError::Plan(
@@ -2901,11 +2895,8 @@ mod tests {
#[test]
fn create_external_table_csv_no_schema() {
let sql = "CREATE EXTERNAL TABLE t STORED AS CSV LOCATION 'foo.csv'";
- let err = logical_plan(sql).expect_err("query should have failed");
- assert_eq!(
- "Plan(\"Column definitions required for CSV files. None found\")",
- format!("{:?}", err)
- );
+ let expected = "CreateExternalTable: \"t\"";
+ quick_test(sql, expected);
}
#[test]
diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs
index 14600f2..27ad901 100644
--- a/datafusion/tests/user_defined_plan.rs
+++ b/datafusion/tests/user_defined_plan.rs
@@ -109,6 +109,22 @@ async fn setup_table(mut ctx: ExecutionContext) -> Result<ExecutionContext> {
Ok(ctx)
}
+async fn setup_table_without_schemas(
+ mut ctx: ExecutionContext,
+) -> Result<ExecutionContext> {
+ let sql = "CREATE EXTERNAL TABLE sales STORED AS CSV location 'tests/customer.csv'";
+
+ let expected = vec!["++", "++"];
+
+ let s = exec_sql(&mut ctx, sql).await?;
+ let actual = s.lines().collect::<Vec<_>>();
+
+ assert_eq!(expected, actual, "Creating table");
+ Ok(ctx)
+}
+
+const QUERY1: &str = "SELECT * FROM sales limit 3";
+
const QUERY: &str =
"SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3";
@@ -142,6 +158,43 @@ async fn run_and_compare_query(
Ok(())
}
+// Run the query using the specified execution context and compare it
+// to the known result
+async fn run_and_compare_query_with_auto_schemas(
+ mut ctx: ExecutionContext,
+ description: &str,
+) -> Result<()> {
+ let expected = vec![
+ "+----------+----------+",
+ "| column_1 | column_2 |",
+ "+----------+----------+",
+ "| andrew | 100 |",
+ "| jorge | 200 |",
+ "| andy | 150 |",
+ "+----------+----------+",
+ ];
+
+ let s = exec_sql(&mut ctx, QUERY1).await?;
+ let actual = s.lines().collect::<Vec<_>>();
+
+ assert_eq!(
+ expected,
+ actual,
+ "output mismatch for {}. Expectedn\n{}Actual:\n{}",
+ description,
+ expected.join("\n"),
+ s
+ );
+ Ok(())
+}
+
+#[tokio::test]
+// Run the query using default planners and optimizer
+async fn normal_query_without_schemas() -> Result<()> {
+ let ctx = setup_table_without_schemas(ExecutionContext::new()).await?;
+ run_and_compare_query_with_auto_schemas(ctx, "Default context").await
+}
+
#[tokio::test]
// Run the query using default planners and optimizer
async fn normal_query() -> Result<()> {