You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/16 22:12:21 UTC
[arrow-datafusion] branch master updated: Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 5223d6f31 Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)
5223d6f31 is described below
commit 5223d6f31752b95e21d655ebdb57fbc6ff807059
Author: Andy Grove <ag...@apache.org>
AuthorDate: Mon May 16 16:12:17 2022 -0600
Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)
---
ballista/rust/core/src/serde/logical_plan/mod.rs | 283 ++++++++---------------
ballista/rust/core/src/serde/mod.rs | 21 +-
datafusion/core/src/execution/context.rs | 29 ++-
datafusion/core/src/logical_plan/builder.rs | 73 +-----
datafusion/core/src/physical_plan/planner.rs | 211 ++++++-----------
datafusion/core/tests/sql/projection.rs | 3 +-
6 files changed, 198 insertions(+), 422 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index e6d9e6289..1dd61a1e9 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -1016,19 +1016,16 @@ macro_rules! into_logical_plan {
mod roundtrip_tests {
use super::super::{super::error::Result, protobuf};
- use crate::error::BallistaError;
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
+ use datafusion::common::DFSchemaRef;
use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datafusion_data_access::{
self,
- object_store::{
- local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
- ObjectStore,
- },
+ object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
SizedFile,
},
datasource::listing::ListingTable,
@@ -1142,26 +1139,11 @@ mod roundtrip_tests {
let test_expr: Vec<Expr> =
vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
let plan = std::sync::Arc::new(
- LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?,
+ test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .build()?,
);
for partition_count in test_partition_counts.iter() {
@@ -1198,13 +1180,7 @@ mod roundtrip_tests {
#[test]
fn roundtrip_create_external_table() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
+ let schema = test_schema();
let df_schema_ref = schema.to_dfschema_ref()?;
@@ -1236,39 +1212,17 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_analyze() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let verbose_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(true, true))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
-
- let plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(false, true))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .explain(true, true)?
+ .build()?;
+
+ let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .explain(false, true)?
+ .build()?;
roundtrip_test!(plan);
@@ -1279,39 +1233,17 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_explain() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let verbose_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(true, false))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
-
- let plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.explain(false, false))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .explain(true, false)?
+ .build()?;
+
+ let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .explain(false, false)?
+ .build()?;
roundtrip_test!(plan);
@@ -1322,36 +1254,14 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_join() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let scan_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee1",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![0, 3, 4]),
- 4,
- )
- .await?
- .build()
- .map_err(BallistaError::DataFusionError)?;
-
- let plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee2",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![0, 3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"])))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
+ .await?
+ .build()?;
+
+ let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
+ .await?
+ .join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"]))?
+ .build()?;
roundtrip_test!(plan);
Ok(())
@@ -1359,25 +1269,10 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_sort() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.sort(vec![col("salary")]))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .sort(vec![col("salary")])?
+ .build()?;
roundtrip_test!(plan);
Ok(())
@@ -1385,15 +1280,11 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_empty_relation() -> Result<()> {
- let plan_false = LogicalPlanBuilder::empty(false)
- .build()
- .map_err(BallistaError::DataFusionError)?;
+ let plan_false = LogicalPlanBuilder::empty(false).build()?;
roundtrip_test!(plan_false);
- let plan_true = LogicalPlanBuilder::empty(true)
- .build()
- .map_err(BallistaError::DataFusionError)?;
+ let plan_true = LogicalPlanBuilder::empty(true).build()?;
roundtrip_test!(plan_true);
@@ -1402,31 +1293,17 @@ mod roundtrip_tests {
#[tokio::test]
async fn roundtrip_logical_plan() -> Result<()> {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- "employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.aggregate(vec![col("state")], vec![max(col("salary"))]))
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+ .await?
+ .aggregate(vec![col("state")], vec![max(col("salary"))])?
+ .build()?;
roundtrip_test!(plan);
Ok(())
}
+ #[ignore] // see https://github.com/apache/arrow-datafusion/issues/2546
#[tokio::test]
async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
let ctx = SessionContext::new();
@@ -1436,28 +1313,18 @@ mod roundtrip_tests {
ctx.runtime_env()
.register_object_store("test", custom_object_store.clone());
- let (os, _) = ctx.runtime_env().object_store("test://foo.csv")?;
-
- println!("Object Store {:?}", os);
+ let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
+ assert_eq!("TestObjectStore", &format!("{:?}", os));
+ assert_eq!("foo.csv", uri);
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ]);
-
- let plan = LogicalPlanBuilder::scan_csv(
- custom_object_store.clone(),
- "test://employee.csv",
- CsvReadOptions::new().schema(&schema).has_header(true),
- Some(vec![3, 4]),
- 4,
- )
- .await
- .and_then(|plan| plan.build())
- .map_err(BallistaError::DataFusionError)?;
+ let schema = test_schema();
+ let plan = ctx
+ .read_csv(
+ "test://employee.csv",
+ CsvReadOptions::new().schema(&schema).has_header(true),
+ )
+ .await?
+ .to_logical_plan()?;
let proto: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
@@ -1488,4 +1355,36 @@ mod roundtrip_tests {
Ok(())
}
+
+ fn test_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("id", DataType::Int32, false),
+ Field::new("first_name", DataType::Utf8, false),
+ Field::new("last_name", DataType::Utf8, false),
+ Field::new("state", DataType::Utf8, false),
+ Field::new("salary", DataType::Int32, false),
+ ])
+ }
+
+ async fn test_scan_csv(
+ table_name: &str,
+ projection: Option<Vec<usize>>,
+ ) -> Result<LogicalPlanBuilder> {
+ let schema = test_schema();
+ let ctx = SessionContext::new();
+ let options = CsvReadOptions::new().schema(&schema);
+ let df = ctx.read_csv(table_name, options).await?;
+ let plan = match df.to_logical_plan()? {
+ LogicalPlan::TableScan(ref scan) => {
+ let mut scan = scan.clone();
+ scan.projection = projection;
+ let mut projected_schema = scan.projected_schema.as_ref().clone();
+ projected_schema = projected_schema.replace_qualifier(table_name);
+ scan.projected_schema = DFSchemaRef::new(projected_schema);
+ LogicalPlan::TableScan(scan)
+ }
+ _ => unimplemented!(),
+ };
+ Ok(LogicalPlanBuilder::from(plan))
+ }
}
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index bc2d7ff6b..236e66c7d 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -351,14 +351,12 @@ fn str_to_byte(s: &str) -> Result<u8, BallistaError> {
mod tests {
use async_trait::async_trait;
use datafusion::arrow::datatypes::SchemaRef;
- use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
use datafusion::error::DataFusionError;
use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::logical_plan::plan::Extension;
use datafusion::logical_plan::{
- col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan, LogicalPlanBuilder,
- UserDefinedLogicalNode,
+ col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan, UserDefinedLogicalNode,
};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
@@ -699,7 +697,6 @@ mod tests {
#[tokio::test]
async fn test_extension_plan() -> crate::error::Result<()> {
- let store = Arc::new(LocalFileSystem {});
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let session_state =
SessionState::with_config_rt(SessionConfig::new(), runtime.clone())
@@ -707,15 +704,13 @@ mod tests {
let ctx = SessionContext::with_state(session_state);
- let scan = LogicalPlanBuilder::scan_csv(
- store,
- "../../../datafusion/core/tests/customer.csv",
- CsvReadOptions::default(),
- None,
- 1,
- )
- .await?
- .build()?;
+ let scan = ctx
+ .read_csv(
+ "../../../datafusion/core/tests/customer.csv",
+ CsvReadOptions::default(),
+ )
+ .await?
+ .to_logical_plan()?;
let topk_plan = LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode::new(3, scan, col("revenue"))),
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 394d03755..ca3bca61d 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -571,18 +571,23 @@ impl SessionContext {
let uri: String = uri.into();
let (object_store, path) = self.runtime_env().object_store(&uri)?;
let target_partitions = self.copied_config().target_partitions;
- Ok(Arc::new(DataFrame::new(
- self.state.clone(),
- &LogicalPlanBuilder::scan_csv(
- object_store,
- path,
- options,
- None,
- target_partitions,
- )
- .await?
- .build()?,
- )))
+ let path = path.to_string();
+ let listing_options = options.to_listing_options(target_partitions);
+ let resolved_schema = match options.schema {
+ Some(s) => Arc::new(s.to_owned()),
+ None => {
+ listing_options
+ .infer_schema(Arc::clone(&object_store), &path)
+ .await?
+ }
+ };
+ let config = ListingTableConfig::new(object_store, path.clone())
+ .with_listing_options(listing_options)
+ .with_schema(resolved_schema);
+ let provider = ListingTable::try_new(config)?;
+
+ let plan = LogicalPlanBuilder::scan(path, Arc::new(provider), None)?.build()?;
+ Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
}
/// Creates a DataFrame for reading a Parquet data source.
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 0b419b840..246eeb756 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -17,11 +17,7 @@
//! This module provides a builder for creating LogicalPlans
-use crate::datasource::{
- empty::EmptyTable,
- listing::{ListingTable, ListingTableConfig},
- MemTable, TableProvider,
-};
+use crate::datasource::{empty::EmptyTable, TableProvider};
use crate::error::{DataFusionError, Result};
use crate::logical_expr::ExprSchemable;
use crate::logical_plan::plan::{
@@ -29,13 +25,8 @@ use crate::logical_plan::plan::{
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
};
use crate::optimizer::utils;
-use crate::prelude::*;
use crate::scalar::ScalarValue;
-use arrow::{
- datatypes::{DataType, Schema, SchemaRef},
- record_batch::RecordBatch,
-};
-use datafusion_data_access::object_store::ObjectStore;
+use arrow::datatypes::{DataType, Schema};
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_to_columns,
};
@@ -197,65 +188,6 @@ impl LogicalPlanBuilder {
Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
}
- /// Scan a memory data source
- pub fn scan_memory(
- partitions: Vec<Vec<RecordBatch>>,
- schema: SchemaRef,
- projection: Option<Vec<usize>>,
- ) -> Result<Self> {
- let provider = Arc::new(MemTable::try_new(schema, partitions)?);
- Self::scan(UNNAMED_TABLE, provider, projection)
- }
-
- /// Scan a CSV data source
- pub async fn scan_csv(
- object_store: Arc<dyn ObjectStore>,
- path: impl Into<String>,
- options: CsvReadOptions<'_>,
- projection: Option<Vec<usize>>,
- target_partitions: usize,
- ) -> Result<Self> {
- let path = path.into();
- Self::scan_csv_with_name(
- object_store,
- path.clone(),
- options,
- projection,
- path,
- target_partitions,
- )
- .await
- }
-
- /// Scan a CSV data source and register it with a given table name
- pub async fn scan_csv_with_name(
- object_store: Arc<dyn ObjectStore>,
- path: impl Into<String>,
- options: CsvReadOptions<'_>,
- projection: Option<Vec<usize>>,
- table_name: impl Into<String>,
- target_partitions: usize,
- ) -> Result<Self> {
- let listing_options = options.to_listing_options(target_partitions);
-
- let path: String = path.into();
-
- let resolved_schema = match options.schema {
- Some(s) => Arc::new(s.to_owned()),
- None => {
- listing_options
- .infer_schema(Arc::clone(&object_store), &path)
- .await?
- }
- };
- let config = ListingTableConfig::new(object_store, path)
- .with_listing_options(listing_options)
- .with_schema(resolved_schema);
- let provider = ListingTable::try_new(config)?;
-
- Self::scan(table_name, Arc::new(provider), projection)
- }
-
/// Scan an empty data source, mainly used in tests
pub fn scan_empty(
name: Option<&str>,
@@ -1009,6 +941,7 @@ mod tests {
use datafusion_expr::expr_fn::exists;
use crate::logical_plan::StringifiedPlan;
+ use crate::prelude::*;
use crate::test::test_table_scan_with_name;
use super::super::{col, lit, sum};
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 47829ad79..77aee4102 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1528,7 +1528,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
#[cfg(test)]
mod tests {
use super::*;
- use crate::datafusion_data_access::object_store::local::LocalFileSystem;
use crate::execution::context::TaskContext;
use crate::execution::options::CsvReadOptions;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -1536,7 +1535,7 @@ mod tests {
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
};
- use crate::prelude::SessionConfig;
+ use crate::prelude::{SessionConfig, SessionContext};
use crate::scalar::ScalarValue;
use crate::{
logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream,
@@ -1566,25 +1565,15 @@ mod tests {
#[tokio::test]
async fn test_all_operators() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
- let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- path,
- options,
- None,
- 1,
- )
- .await?
- // filter clause needs the type coercion rule applied
- .filter(col("c7").lt(lit(5_u8)))?
- .project(vec![col("c1"), col("c2")])?
- .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
- .sort(vec![col("c1").sort(true, true)])?
- .limit(10)?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ // filter clause needs the type coercion rule applied
+ .filter(col("c7").lt(lit(5_u8)))?
+ .project(vec![col("c1"), col("c2")])?
+ .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+ .sort(vec![col("c1").sort(true, true)])?
+ .limit(10)?
+ .build()?;
let plan = plan(&logical_plan).await?;
@@ -1618,20 +1607,10 @@ mod tests {
#[tokio::test]
async fn test_with_csv_plan() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
- let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- path,
- options,
- None,
- 1,
- )
- .await?
- .filter(col("c7").lt(col("c12")))?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ .filter(col("c7").lt(col("c12")))?
+ .build()?;
let plan = plan(&logical_plan).await?;
@@ -1644,10 +1623,6 @@ mod tests {
#[tokio::test]
async fn errors() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
- let options = CsvReadOptions::new().schema_infer_max_records(100);
-
let bool_expr = col("c1").eq(col("c1"));
let cases = vec![
// utf8 < u32
@@ -1666,15 +1641,7 @@ mod tests {
col("c1").like(col("c2")),
];
for case in cases {
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options.clone(),
- None,
- 1,
- )
- .await?
- .project(vec![case.clone()]);
+ let logical_plan = test_csv_scan().await?.project(vec![case.clone()]);
let message = format!(
"Expression {:?} expected to error due to impossible coercion",
case
@@ -1757,27 +1724,17 @@ mod tests {
#[tokio::test]
async fn in_list_types() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
- let options = CsvReadOptions::new().schema_infer_max_records(100);
-
// expression: "a in ('a', 1)"
let list = vec![
Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
Expr::Literal(ScalarValue::Int64(Some(1))),
];
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options.clone(),
- None,
- 1,
- )
- .await?
- // filter clause needs the type coercion rule applied
- .filter(col("c12").lt(lit(0.05)))?
- .project(vec![col("c1").in_list(list, false)])?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ // filter clause needs the type coercion rule applied
+ .filter(col("c12").lt(lit(0.05)))?
+ .project(vec![col("c1").in_list(list, false)])?
+ .build()?;
let execution_plan = plan(&logical_plan).await?;
// verify that the plan correctly adds cast from Int64(1) to Utf8
let expected = "InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }], negated: false, set: None }";
@@ -1788,18 +1745,12 @@ mod tests {
Expr::Literal(ScalarValue::Boolean(Some(true))),
Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
];
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options.clone(),
- None,
- 1,
- )
- .await?
- // filter clause needs the type coercion rule applied
- .filter(col("c12").lt(lit(0.05)))?
- .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ // filter clause needs the type coercion rule applied
+ .filter(col("c12").lt(lit(0.05)))?
+ .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
+ .build()?;
let execution_plan = plan(&logical_plan).await;
let expected_error = "Unsupported CAST from Utf8 to Boolean";
@@ -1818,10 +1769,6 @@ mod tests {
#[tokio::test]
async fn in_set_test() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
- let options = CsvReadOptions::new().schema_infer_max_records(100);
-
// OPTIMIZER_INSET_THRESHOLD = 10
// expression: "a in ('a', 1, 2, ..30)"
let mut list = vec![Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))];
@@ -1829,17 +1776,11 @@ mod tests {
list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
}
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options,
- None,
- 1,
- )
- .await?
- .filter(col("c12").lt(lit(0.05)))?
- .project(vec![col("c1").in_list(list, false)])?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ .filter(col("c12").lt(lit(0.05)))?
+ .project(vec![col("c1").in_list(list, false)])?
+ .build()?;
let execution_plan = plan(&logical_plan).await?;
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(2) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(3) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(4) [...]
assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1848,26 +1789,17 @@ mod tests {
#[tokio::test]
async fn in_set_null_test() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
- let options = CsvReadOptions::new().schema_infer_max_records(100);
// test NULL
let mut list = vec![Expr::Literal(ScalarValue::Int64(None))];
for i in 1..31 {
list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
}
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options,
- None,
- 1,
- )
- .await?
- .filter(col("c12").lt(lit(0.05)))?
- .project(vec![col("c1").in_list(list, false)])?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ .filter(col("c12").lt(lit(0.05)))?
+ .project(vec![col("c1").in_list(list, false)])?
+ .build()?;
let execution_plan = plan(&logical_plan).await?;
let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [CastExpr { expr: Literal { value: Int64(NULL) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(2) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(3) }, cast_type: Utf8, cast_opti [...]
assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1876,21 +1808,10 @@ mod tests {
#[tokio::test]
async fn hash_agg_input_schema() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
- let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv_with_name(
- Arc::new(LocalFileSystem {}),
- &path,
- options,
- None,
- "aggregate_test_100",
- 1,
- )
- .await?
- .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
- .build()?;
+ let logical_plan = test_csv_scan_with_name("aggregate_test_100")
+ .await?
+ .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+ .build()?;
let execution_plan = plan(&logical_plan).await?;
let final_hash_agg = execution_plan
@@ -1910,20 +1831,10 @@ mod tests {
#[tokio::test]
async fn hash_agg_group_by_partitioned() -> Result<()> {
- let testdata = crate::test_util::arrow_test_data();
- let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
- let options = CsvReadOptions::new().schema_infer_max_records(100);
- let logical_plan = LogicalPlanBuilder::scan_csv(
- Arc::new(LocalFileSystem {}),
- &path,
- options,
- None,
- 1,
- )
- .await?
- .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
- .build()?;
+ let logical_plan = test_csv_scan()
+ .await?
+ .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+ .build()?;
let execution_plan = plan(&logical_plan).await?;
let formatted = format!("{:?}", execution_plan);
@@ -2112,4 +2023,36 @@ mod tests {
})))
}
}
+
+ async fn test_csv_scan_with_name(name: &str) -> Result<LogicalPlanBuilder> {
+ let ctx = SessionContext::new();
+ let testdata = crate::test_util::arrow_test_data();
+ let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+ let options = CsvReadOptions::new().schema_infer_max_records(100);
+ let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? {
+ LogicalPlan::TableScan(ref scan) => {
+ let mut scan = scan.clone();
+ scan.table_name = name.to_string();
+ let new_schema = scan
+ .projected_schema
+ .as_ref()
+ .clone()
+ .replace_qualifier(name);
+ scan.projected_schema = Arc::new(new_schema);
+ LogicalPlan::TableScan(scan)
+ }
+ _ => unimplemented!(),
+ };
+ Ok(LogicalPlanBuilder::from(logical_plan))
+ }
+
+ async fn test_csv_scan() -> Result<LogicalPlanBuilder> {
+ let ctx = SessionContext::new();
+ let testdata = crate::test_util::arrow_test_data();
+ let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+ let options = CsvReadOptions::new().schema_infer_max_records(100);
+ Ok(LogicalPlanBuilder::from(
+ ctx.read_csv(path, options).await?.to_logical_plan()?,
+ ))
+ }
}
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index 25ff30f0e..0b7ce860e 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -237,7 +237,8 @@ async fn projection_on_memory_scan() -> Result<()> {
],
)?]];
- let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
+ let provider = Arc::new(MemTable::try_new(schema, partitions)?);
+ let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?
.project(vec![col("b")])?
.build()?;
assert_fields_eq(&plan, vec!["b"]);