You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/05/16 22:12:21 UTC

[arrow-datafusion] branch master updated: Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 5223d6f31 Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)
5223d6f31 is described below

commit 5223d6f31752b95e21d655ebdb57fbc6ff807059
Author: Andy Grove <ag...@apache.org>
AuthorDate: Mon May 16 16:12:17 2022 -0600

    Remove `scan_csv` methods from `LogicalPlanBuilder` (#2537)
---
 ballista/rust/core/src/serde/logical_plan/mod.rs | 283 ++++++++---------------
 ballista/rust/core/src/serde/mod.rs              |  21 +-
 datafusion/core/src/execution/context.rs         |  29 ++-
 datafusion/core/src/logical_plan/builder.rs      |  73 +-----
 datafusion/core/src/physical_plan/planner.rs     | 211 ++++++-----------
 datafusion/core/tests/sql/projection.rs          |   3 +-
 6 files changed, 198 insertions(+), 422 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index e6d9e6289..1dd61a1e9 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -1016,19 +1016,16 @@ macro_rules! into_logical_plan {
 mod roundtrip_tests {
 
     use super::super::{super::error::Result, protobuf};
-    use crate::error::BallistaError;
     use crate::serde::{AsLogicalPlan, BallistaCodec};
     use async_trait::async_trait;
     use core::panic;
+    use datafusion::common::DFSchemaRef;
     use datafusion::logical_plan::source_as_provider;
     use datafusion::{
         arrow::datatypes::{DataType, Field, Schema},
         datafusion_data_access::{
             self,
-            object_store::{
-                local::LocalFileSystem, FileMetaStream, ListEntryStream, ObjectReader,
-                ObjectStore,
-            },
+            object_store::{FileMetaStream, ListEntryStream, ObjectReader, ObjectStore},
             SizedFile,
         },
         datasource::listing::ListingTable,
@@ -1142,26 +1139,11 @@ mod roundtrip_tests {
         let test_expr: Vec<Expr> =
             vec![col("c1") + col("c2"), Expr::Literal((4.0).into())];
 
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
         let plan = std::sync::Arc::new(
-            LogicalPlanBuilder::scan_csv(
-                Arc::new(LocalFileSystem {}),
-                "employee.csv",
-                CsvReadOptions::new().schema(&schema).has_header(true),
-                Some(vec![3, 4]),
-                4,
-            )
-            .await
-            .and_then(|plan| plan.sort(vec![col("salary")]))
-            .and_then(|plan| plan.build())
-            .map_err(BallistaError::DataFusionError)?,
+            test_scan_csv("employee.csv", Some(vec![3, 4]))
+                .await?
+                .sort(vec![col("salary")])?
+                .build()?,
         );
 
         for partition_count in test_partition_counts.iter() {
@@ -1198,13 +1180,7 @@ mod roundtrip_tests {
 
     #[test]
     fn roundtrip_create_external_table() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
+        let schema = test_schema();
 
         let df_schema_ref = schema.to_dfschema_ref()?;
 
@@ -1236,39 +1212,17 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_analyze() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let verbose_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(true, true))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(false, true))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .sort(vec![col("salary")])?
+            .explain(true, true)?
+            .build()?;
+
+        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .sort(vec![col("salary")])?
+            .explain(false, true)?
+            .build()?;
 
         roundtrip_test!(plan);
 
@@ -1279,39 +1233,17 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_explain() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let verbose_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(true, false))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.explain(false, false))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let verbose_plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .sort(vec![col("salary")])?
+            .explain(true, false)?
+            .build()?;
+
+        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .sort(vec![col("salary")])?
+            .explain(false, false)?
+            .build()?;
 
         roundtrip_test!(plan);
 
@@ -1322,36 +1254,14 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_join() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let scan_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee1",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![0, 3, 4]),
-            4,
-        )
-        .await?
-        .build()
-        .map_err(BallistaError::DataFusionError)?;
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee2",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![0, 3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"])))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let scan_plan = test_scan_csv("employee1", Some(vec![0, 3, 4]))
+            .await?
+            .build()?;
+
+        let plan = test_scan_csv("employee2", Some(vec![0, 3, 4]))
+            .await?
+            .join(&scan_plan, JoinType::Inner, (vec!["id"], vec!["id"]))?
+            .build()?;
 
         roundtrip_test!(plan);
         Ok(())
@@ -1359,25 +1269,10 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_sort() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.sort(vec![col("salary")]))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .sort(vec![col("salary")])?
+            .build()?;
         roundtrip_test!(plan);
 
         Ok(())
@@ -1385,15 +1280,11 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_empty_relation() -> Result<()> {
-        let plan_false = LogicalPlanBuilder::empty(false)
-            .build()
-            .map_err(BallistaError::DataFusionError)?;
+        let plan_false = LogicalPlanBuilder::empty(false).build()?;
 
         roundtrip_test!(plan_false);
 
-        let plan_true = LogicalPlanBuilder::empty(true)
-            .build()
-            .map_err(BallistaError::DataFusionError)?;
+        let plan_true = LogicalPlanBuilder::empty(true).build()?;
 
         roundtrip_test!(plan_true);
 
@@ -1402,31 +1293,17 @@ mod roundtrip_tests {
 
     #[tokio::test]
     async fn roundtrip_logical_plan() -> Result<()> {
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            "employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.aggregate(vec![col("state")], vec![max(col("salary"))]))
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let plan = test_scan_csv("employee.csv", Some(vec![3, 4]))
+            .await?
+            .aggregate(vec![col("state")], vec![max(col("salary"))])?
+            .build()?;
 
         roundtrip_test!(plan);
 
         Ok(())
     }
 
+    #[ignore] // see https://github.com/apache/arrow-datafusion/issues/2546
     #[tokio::test]
     async fn roundtrip_logical_plan_custom_ctx() -> Result<()> {
         let ctx = SessionContext::new();
@@ -1436,28 +1313,18 @@ mod roundtrip_tests {
         ctx.runtime_env()
             .register_object_store("test", custom_object_store.clone());
 
-        let (os, _) = ctx.runtime_env().object_store("test://foo.csv")?;
-
-        println!("Object Store {:?}", os);
+        let (os, uri) = ctx.runtime_env().object_store("test://foo.csv")?;
+        assert_eq!("TestObjectStore", &format!("{:?}", os));
+        assert_eq!("foo.csv", uri);
 
-        let schema = Schema::new(vec![
-            Field::new("id", DataType::Int32, false),
-            Field::new("first_name", DataType::Utf8, false),
-            Field::new("last_name", DataType::Utf8, false),
-            Field::new("state", DataType::Utf8, false),
-            Field::new("salary", DataType::Int32, false),
-        ]);
-
-        let plan = LogicalPlanBuilder::scan_csv(
-            custom_object_store.clone(),
-            "test://employee.csv",
-            CsvReadOptions::new().schema(&schema).has_header(true),
-            Some(vec![3, 4]),
-            4,
-        )
-        .await
-        .and_then(|plan| plan.build())
-        .map_err(BallistaError::DataFusionError)?;
+        let schema = test_schema();
+        let plan = ctx
+            .read_csv(
+                "test://employee.csv",
+                CsvReadOptions::new().schema(&schema).has_header(true),
+            )
+            .await?
+            .to_logical_plan()?;
 
         let proto: protobuf::LogicalPlanNode =
             protobuf::LogicalPlanNode::try_from_logical_plan(
@@ -1488,4 +1355,36 @@ mod roundtrip_tests {
 
         Ok(())
     }
+
+    fn test_schema() -> Schema {
+        Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("first_name", DataType::Utf8, false),
+            Field::new("last_name", DataType::Utf8, false),
+            Field::new("state", DataType::Utf8, false),
+            Field::new("salary", DataType::Int32, false),
+        ])
+    }
+
+    async fn test_scan_csv(
+        table_name: &str,
+        projection: Option<Vec<usize>>,
+    ) -> Result<LogicalPlanBuilder> {
+        let schema = test_schema();
+        let ctx = SessionContext::new();
+        let options = CsvReadOptions::new().schema(&schema);
+        let df = ctx.read_csv(table_name, options).await?;
+        let plan = match df.to_logical_plan()? {
+            LogicalPlan::TableScan(ref scan) => {
+                let mut scan = scan.clone();
+                scan.projection = projection;
+                let mut projected_schema = scan.projected_schema.as_ref().clone();
+                projected_schema = projected_schema.replace_qualifier(table_name);
+                scan.projected_schema = DFSchemaRef::new(projected_schema);
+                LogicalPlan::TableScan(scan)
+            }
+            _ => unimplemented!(),
+        };
+        Ok(LogicalPlanBuilder::from(plan))
+    }
 }
diff --git a/ballista/rust/core/src/serde/mod.rs b/ballista/rust/core/src/serde/mod.rs
index bc2d7ff6b..236e66c7d 100644
--- a/ballista/rust/core/src/serde/mod.rs
+++ b/ballista/rust/core/src/serde/mod.rs
@@ -351,14 +351,12 @@ fn str_to_byte(s: &str) -> Result<u8, BallistaError> {
 mod tests {
     use async_trait::async_trait;
     use datafusion::arrow::datatypes::SchemaRef;
-    use datafusion::datafusion_data_access::object_store::local::LocalFileSystem;
     use datafusion::error::DataFusionError;
     use datafusion::execution::context::{QueryPlanner, SessionState, TaskContext};
     use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
     use datafusion::logical_plan::plan::Extension;
     use datafusion::logical_plan::{
-        col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan, LogicalPlanBuilder,
-        UserDefinedLogicalNode,
+        col, DFSchemaRef, Expr, FunctionRegistry, LogicalPlan, UserDefinedLogicalNode,
     };
     use datafusion::physical_plan::expressions::PhysicalSortExpr;
     use datafusion::physical_plan::planner::{DefaultPhysicalPlanner, ExtensionPlanner};
@@ -699,7 +697,6 @@ mod tests {
 
     #[tokio::test]
     async fn test_extension_plan() -> crate::error::Result<()> {
-        let store = Arc::new(LocalFileSystem {});
         let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
         let session_state =
             SessionState::with_config_rt(SessionConfig::new(), runtime.clone())
@@ -707,15 +704,13 @@ mod tests {
 
         let ctx = SessionContext::with_state(session_state);
 
-        let scan = LogicalPlanBuilder::scan_csv(
-            store,
-            "../../../datafusion/core/tests/customer.csv",
-            CsvReadOptions::default(),
-            None,
-            1,
-        )
-        .await?
-        .build()?;
+        let scan = ctx
+            .read_csv(
+                "../../../datafusion/core/tests/customer.csv",
+                CsvReadOptions::default(),
+            )
+            .await?
+            .to_logical_plan()?;
 
         let topk_plan = LogicalPlan::Extension(Extension {
             node: Arc::new(TopKPlanNode::new(3, scan, col("revenue"))),
diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs
index 394d03755..ca3bca61d 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -571,18 +571,23 @@ impl SessionContext {
         let uri: String = uri.into();
         let (object_store, path) = self.runtime_env().object_store(&uri)?;
         let target_partitions = self.copied_config().target_partitions;
-        Ok(Arc::new(DataFrame::new(
-            self.state.clone(),
-            &LogicalPlanBuilder::scan_csv(
-                object_store,
-                path,
-                options,
-                None,
-                target_partitions,
-            )
-            .await?
-            .build()?,
-        )))
+        let path = path.to_string();
+        let listing_options = options.to_listing_options(target_partitions);
+        let resolved_schema = match options.schema {
+            Some(s) => Arc::new(s.to_owned()),
+            None => {
+                listing_options
+                    .infer_schema(Arc::clone(&object_store), &path)
+                    .await?
+            }
+        };
+        let config = ListingTableConfig::new(object_store, path.clone())
+            .with_listing_options(listing_options)
+            .with_schema(resolved_schema);
+        let provider = ListingTable::try_new(config)?;
+
+        let plan = LogicalPlanBuilder::scan(path, Arc::new(provider), None)?.build()?;
+        Ok(Arc::new(DataFrame::new(self.state.clone(), &plan)))
     }
 
     /// Creates a DataFrame for reading a Parquet data source.
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index 0b419b840..246eeb756 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -17,11 +17,7 @@
 
 //! This module provides a builder for creating LogicalPlans
 
-use crate::datasource::{
-    empty::EmptyTable,
-    listing::{ListingTable, ListingTableConfig},
-    MemTable, TableProvider,
-};
+use crate::datasource::{empty::EmptyTable, TableProvider};
 use crate::error::{DataFusionError, Result};
 use crate::logical_expr::ExprSchemable;
 use crate::logical_plan::plan::{
@@ -29,13 +25,8 @@ use crate::logical_plan::plan::{
     SubqueryAlias, TableScan, ToStringifiedPlan, Union, Window,
 };
 use crate::optimizer::utils;
-use crate::prelude::*;
 use crate::scalar::ScalarValue;
-use arrow::{
-    datatypes::{DataType, Schema, SchemaRef},
-    record_batch::RecordBatch,
-};
-use datafusion_data_access::object_store::ObjectStore;
+use arrow::datatypes::{DataType, Schema};
 use datafusion_expr::utils::{
     expand_qualified_wildcard, expand_wildcard, expr_to_columns,
 };
@@ -197,65 +188,6 @@ impl LogicalPlanBuilder {
         Ok(Self::from(LogicalPlan::Values(Values { schema, values })))
     }
 
-    /// Scan a memory data source
-    pub fn scan_memory(
-        partitions: Vec<Vec<RecordBatch>>,
-        schema: SchemaRef,
-        projection: Option<Vec<usize>>,
-    ) -> Result<Self> {
-        let provider = Arc::new(MemTable::try_new(schema, partitions)?);
-        Self::scan(UNNAMED_TABLE, provider, projection)
-    }
-
-    /// Scan a CSV data source
-    pub async fn scan_csv(
-        object_store: Arc<dyn ObjectStore>,
-        path: impl Into<String>,
-        options: CsvReadOptions<'_>,
-        projection: Option<Vec<usize>>,
-        target_partitions: usize,
-    ) -> Result<Self> {
-        let path = path.into();
-        Self::scan_csv_with_name(
-            object_store,
-            path.clone(),
-            options,
-            projection,
-            path,
-            target_partitions,
-        )
-        .await
-    }
-
-    /// Scan a CSV data source and register it with a given table name
-    pub async fn scan_csv_with_name(
-        object_store: Arc<dyn ObjectStore>,
-        path: impl Into<String>,
-        options: CsvReadOptions<'_>,
-        projection: Option<Vec<usize>>,
-        table_name: impl Into<String>,
-        target_partitions: usize,
-    ) -> Result<Self> {
-        let listing_options = options.to_listing_options(target_partitions);
-
-        let path: String = path.into();
-
-        let resolved_schema = match options.schema {
-            Some(s) => Arc::new(s.to_owned()),
-            None => {
-                listing_options
-                    .infer_schema(Arc::clone(&object_store), &path)
-                    .await?
-            }
-        };
-        let config = ListingTableConfig::new(object_store, path)
-            .with_listing_options(listing_options)
-            .with_schema(resolved_schema);
-        let provider = ListingTable::try_new(config)?;
-
-        Self::scan(table_name, Arc::new(provider), projection)
-    }
-
     /// Scan an empty data source, mainly used in tests
     pub fn scan_empty(
         name: Option<&str>,
@@ -1009,6 +941,7 @@ mod tests {
     use datafusion_expr::expr_fn::exists;
 
     use crate::logical_plan::StringifiedPlan;
+    use crate::prelude::*;
     use crate::test::test_table_scan_with_name;
 
     use super::super::{col, lit, sum};
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 47829ad79..77aee4102 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -1528,7 +1528,6 @@ fn tuple_err<T, R>(value: (Result<T>, Result<R>)) -> Result<(T, R)> {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::datafusion_data_access::object_store::local::LocalFileSystem;
     use crate::execution::context::TaskContext;
     use crate::execution::options::CsvReadOptions;
     use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
@@ -1536,7 +1535,7 @@ mod tests {
     use crate::physical_plan::{
         expressions, DisplayFormatType, Partitioning, Statistics,
     };
-    use crate::prelude::SessionConfig;
+    use crate::prelude::{SessionConfig, SessionContext};
     use crate::scalar::ScalarValue;
     use crate::{
         logical_plan::LogicalPlanBuilder, physical_plan::SendableRecordBatchStream,
@@ -1566,25 +1565,15 @@ mod tests {
 
     #[tokio::test]
     async fn test_all_operators() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            path,
-            options,
-            None,
-            1,
-        )
-        .await?
-        // filter clause needs the type coercion rule applied
-        .filter(col("c7").lt(lit(5_u8)))?
-        .project(vec![col("c1"), col("c2")])?
-        .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-        .sort(vec![col("c1").sort(true, true)])?
-        .limit(10)?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            // filter clause needs the type coercion rule applied
+            .filter(col("c7").lt(lit(5_u8)))?
+            .project(vec![col("c1"), col("c2")])?
+            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+            .sort(vec![col("c1").sort(true, true)])?
+            .limit(10)?
+            .build()?;
 
         let plan = plan(&logical_plan).await?;
 
@@ -1618,20 +1607,10 @@ mod tests {
 
     #[tokio::test]
     async fn test_with_csv_plan() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            path,
-            options,
-            None,
-            1,
-        )
-        .await?
-        .filter(col("c7").lt(col("c12")))?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            .filter(col("c7").lt(col("c12")))?
+            .build()?;
 
         let plan = plan(&logical_plan).await?;
 
@@ -1644,10 +1623,6 @@ mod tests {
 
     #[tokio::test]
     async fn errors() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-
         let bool_expr = col("c1").eq(col("c1"));
         let cases = vec![
             // utf8 < u32
@@ -1666,15 +1641,7 @@ mod tests {
             col("c1").like(col("c2")),
         ];
         for case in cases {
-            let logical_plan = LogicalPlanBuilder::scan_csv(
-                Arc::new(LocalFileSystem {}),
-                &path,
-                options.clone(),
-                None,
-                1,
-            )
-            .await?
-            .project(vec![case.clone()]);
+            let logical_plan = test_csv_scan().await?.project(vec![case.clone()]);
             let message = format!(
                 "Expression {:?} expected to error due to impossible coercion",
                 case
@@ -1757,27 +1724,17 @@ mod tests {
 
     #[tokio::test]
     async fn in_list_types() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-
         // expression: "a in ('a', 1)"
         let list = vec![
             Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
             Expr::Literal(ScalarValue::Int64(Some(1))),
         ];
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options.clone(),
-            None,
-            1,
-        )
-        .await?
-        // filter clause needs the type coercion rule applied
-        .filter(col("c12").lt(lit(0.05)))?
-        .project(vec![col("c1").in_list(list, false)])?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            // filter clause needs the type coercion rule applied
+            .filter(col("c12").lt(lit(0.05)))?
+            .project(vec![col("c1").in_list(list, false)])?
+            .build()?;
         let execution_plan = plan(&logical_plan).await?;
         // verify that the plan correctly adds cast from Int64(1) to Utf8
         let expected = "InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }], negated: false, set: None }";
@@ -1788,18 +1745,12 @@ mod tests {
             Expr::Literal(ScalarValue::Boolean(Some(true))),
             Expr::Literal(ScalarValue::Utf8(Some("a".to_string()))),
         ];
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options.clone(),
-            None,
-            1,
-        )
-        .await?
-        // filter clause needs the type coercion rule applied
-        .filter(col("c12").lt(lit(0.05)))?
-        .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            // filter clause needs the type coercion rule applied
+            .filter(col("c12").lt(lit(0.05)))?
+            .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])?
+            .build()?;
         let execution_plan = plan(&logical_plan).await;
 
         let expected_error = "Unsupported CAST from Utf8 to Boolean";
@@ -1818,10 +1769,6 @@ mod tests {
 
     #[tokio::test]
     async fn in_set_test() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-
         // OPTIMIZER_INSET_THRESHOLD = 10
         // expression: "a in ('a', 1, 2, ..30)"
         let mut list = vec![Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))];
@@ -1829,17 +1776,11 @@ mod tests {
             list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
         }
 
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options,
-            None,
-            1,
-        )
-        .await?
-        .filter(col("c12").lt(lit(0.05)))?
-        .project(vec![col("c1").in_list(list, false)])?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            .filter(col("c12").lt(lit(0.05)))?
+            .project(vec![col("c1").in_list(list, false)])?
+            .build()?;
         let execution_plan = plan(&logical_plan).await?;
         let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(2) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(3) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(4)  [...]
         assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1848,26 +1789,17 @@ mod tests {
 
     #[tokio::test]
     async fn in_set_null_test() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
         // test NULL
         let mut list = vec![Expr::Literal(ScalarValue::Int64(None))];
         for i in 1..31 {
             list.push(Expr::Literal(ScalarValue::Int64(Some(i))));
         }
 
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options,
-            None,
-            1,
-        )
-        .await?
-        .filter(col("c12").lt(lit(0.05)))?
-        .project(vec![col("c1").in_list(list, false)])?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            .filter(col("c12").lt(lit(0.05)))?
+            .project(vec![col("c1").in_list(list, false)])?
+            .build()?;
         let execution_plan = plan(&logical_plan).await?;
         let expected = "expr: [(InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [CastExpr { expr: Literal { value: Int64(NULL) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(2) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }, CastExpr { expr: Literal { value: Int64(3) }, cast_type: Utf8, cast_opti [...]
         assert!(format!("{:?}", execution_plan).contains(expected));
@@ -1876,21 +1808,10 @@ mod tests {
 
     #[tokio::test]
     async fn hash_agg_input_schema() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-        let logical_plan = LogicalPlanBuilder::scan_csv_with_name(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options,
-            None,
-            "aggregate_test_100",
-            1,
-        )
-        .await?
-        .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-        .build()?;
+        let logical_plan = test_csv_scan_with_name("aggregate_test_100")
+            .await?
+            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+            .build()?;
 
         let execution_plan = plan(&logical_plan).await?;
         let final_hash_agg = execution_plan
@@ -1910,20 +1831,10 @@ mod tests {
 
     #[tokio::test]
     async fn hash_agg_group_by_partitioned() -> Result<()> {
-        let testdata = crate::test_util::arrow_test_data();
-        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
-
-        let options = CsvReadOptions::new().schema_infer_max_records(100);
-        let logical_plan = LogicalPlanBuilder::scan_csv(
-            Arc::new(LocalFileSystem {}),
-            &path,
-            options,
-            None,
-            1,
-        )
-        .await?
-        .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
-        .build()?;
+        let logical_plan = test_csv_scan()
+            .await?
+            .aggregate(vec![col("c1")], vec![sum(col("c2"))])?
+            .build()?;
 
         let execution_plan = plan(&logical_plan).await?;
         let formatted = format!("{:?}", execution_plan);
@@ -2112,4 +2023,36 @@ mod tests {
             })))
         }
     }
+
+    async fn test_csv_scan_with_name(name: &str) -> Result<LogicalPlanBuilder> {
+        let ctx = SessionContext::new();
+        let testdata = crate::test_util::arrow_test_data();
+        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+        let options = CsvReadOptions::new().schema_infer_max_records(100);
+        let logical_plan = match ctx.read_csv(path, options).await?.to_logical_plan()? {
+            LogicalPlan::TableScan(ref scan) => {
+                let mut scan = scan.clone();
+                scan.table_name = name.to_string();
+                let new_schema = scan
+                    .projected_schema
+                    .as_ref()
+                    .clone()
+                    .replace_qualifier(name);
+                scan.projected_schema = Arc::new(new_schema);
+                LogicalPlan::TableScan(scan)
+            }
+            _ => unimplemented!(),
+        };
+        Ok(LogicalPlanBuilder::from(logical_plan))
+    }
+
+    async fn test_csv_scan() -> Result<LogicalPlanBuilder> {
+        let ctx = SessionContext::new();
+        let testdata = crate::test_util::arrow_test_data();
+        let path = format!("{}/csv/aggregate_test_100.csv", testdata);
+        let options = CsvReadOptions::new().schema_infer_max_records(100);
+        Ok(LogicalPlanBuilder::from(
+            ctx.read_csv(path, options).await?.to_logical_plan()?,
+        ))
+    }
 }
diff --git a/datafusion/core/tests/sql/projection.rs b/datafusion/core/tests/sql/projection.rs
index 25ff30f0e..0b7ce860e 100644
--- a/datafusion/core/tests/sql/projection.rs
+++ b/datafusion/core/tests/sql/projection.rs
@@ -237,7 +237,8 @@ async fn projection_on_memory_scan() -> Result<()> {
         ],
     )?]];
 
-    let plan = LogicalPlanBuilder::scan_memory(partitions, schema, None)?
+    let provider = Arc::new(MemTable::try_new(schema, partitions)?);
+    let plan = LogicalPlanBuilder::scan(UNNAMED_TABLE, provider, None)?
         .project(vec![col("b")])?
         .build()?;
     assert_fields_eq(&plan, vec!["b"]);