You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ak...@apache.org on 2023/12/18 10:55:54 UTC

(arrow-datafusion) branch main updated: Remove ListingTable and FileScanConfig Unbounded (#8540) (#8573)

This is an automated email from the ASF dual-hosted git repository.

akurmustafa pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a1e959d87a Remove ListingTable and FileScanConfig Unbounded (#8540) (#8573)
a1e959d87a is described below

commit a1e959d87a66da7060bd005b1993b824c0683a63
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Mon Dec 18 10:55:49 2023 +0000

    Remove ListingTable and FileScanConfig Unbounded (#8540) (#8573)
    
    * Remove ListingTable and FileScanConfig Unbounded (#8540)
    
    * Fix substrait
    
    * Fix logical conflicts
    
    * Add deleted tests as ignored
    
    ---------
    
    Co-authored-by: Mustafa Akur <mu...@synnada.ai>
---
 datafusion-examples/examples/csv_opener.rs         |   1 -
 datafusion-examples/examples/json_opener.rs        |   1 -
 datafusion/core/src/datasource/file_format/mod.rs  |   1 -
 .../core/src/datasource/file_format/options.rs     |  48 ++-----
 datafusion/core/src/datasource/listing/table.rs    | 152 ---------------------
 .../core/src/datasource/listing_table_factory.rs   |  16 +--
 .../src/datasource/physical_plan/arrow_file.rs     |   4 -
 .../core/src/datasource/physical_plan/avro.rs      |   7 -
 .../core/src/datasource/physical_plan/csv.rs       |   4 -
 .../datasource/physical_plan/file_scan_config.rs   |   3 -
 .../src/datasource/physical_plan/file_stream.rs    |   1 -
 .../core/src/datasource/physical_plan/json.rs      |   8 --
 .../core/src/datasource/physical_plan/mod.rs       |   4 -
 .../src/datasource/physical_plan/parquet/mod.rs    |   4 -
 datafusion/core/src/execution/context/mod.rs       |  11 +-
 .../combine_partial_final_agg.rs                   |   1 -
 .../src/physical_optimizer/enforce_distribution.rs |   5 -
 .../core/src/physical_optimizer/enforce_sorting.rs |  15 +-
 .../src/physical_optimizer/projection_pushdown.rs  |   2 -
 .../replace_with_order_preserving_variants.rs      |  92 ++++++-------
 .../core/src/physical_optimizer/test_utils.rs      |  24 ++--
 datafusion/core/src/test/mod.rs                    |   3 -
 datafusion/core/src/test_util/mod.rs               |  25 +---
 datafusion/core/src/test_util/parquet.rs           |   1 -
 datafusion/core/tests/parquet/custom_reader.rs     |   1 -
 datafusion/core/tests/parquet/page_pruning.rs      |   1 -
 datafusion/core/tests/parquet/schema_coercion.rs   |   2 -
 datafusion/core/tests/sql/joins.rs                 |  42 ++----
 datafusion/proto/src/physical_plan/from_proto.rs   |   1 -
 .../proto/tests/cases/roundtrip_physical_plan.rs   |   1 -
 datafusion/substrait/src/physical_plan/consumer.rs |   1 -
 .../tests/cases/roundtrip_physical_plan.rs         |   1 -
 32 files changed, 102 insertions(+), 381 deletions(-)

diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs
index 15fb07ded4..96753c8c52 100644
--- a/datafusion-examples/examples/csv_opener.rs
+++ b/datafusion-examples/examples/csv_opener.rs
@@ -67,7 +67,6 @@ async fn main() -> Result<()> {
         limit: Some(5),
         table_partition_cols: vec![],
         output_ordering: vec![],
-        infinite_source: false,
     };
 
     let result =
diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs
index 1a3dbe57be..ee33f969ca 100644
--- a/datafusion-examples/examples/json_opener.rs
+++ b/datafusion-examples/examples/json_opener.rs
@@ -70,7 +70,6 @@ async fn main() -> Result<()> {
         limit: Some(5),
         table_partition_cols: vec![],
         output_ordering: vec![],
-        infinite_source: false,
     };
 
     let result =
diff --git a/datafusion/core/src/datasource/file_format/mod.rs b/datafusion/core/src/datasource/file_format/mod.rs
index 7c2331548e..12c9fb91ad 100644
--- a/datafusion/core/src/datasource/file_format/mod.rs
+++ b/datafusion/core/src/datasource/file_format/mod.rs
@@ -165,7 +165,6 @@ pub(crate) mod test_util {
                     limit,
                     table_partition_cols: vec![],
                     output_ordering: vec![],
-                    infinite_source: false,
                 },
                 None,
             )
diff --git a/datafusion/core/src/datasource/file_format/options.rs b/datafusion/core/src/datasource/file_format/options.rs
index 4c7557a4a9..d389137785 100644
--- a/datafusion/core/src/datasource/file_format/options.rs
+++ b/datafusion/core/src/datasource/file_format/options.rs
@@ -21,7 +21,6 @@ use std::sync::Arc;
 
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 use async_trait::async_trait;
-use datafusion_common::{plan_err, DataFusionError};
 
 use crate::datasource::file_format::arrow::ArrowFormat;
 use crate::datasource::file_format::file_compression_type::FileCompressionType;
@@ -72,8 +71,6 @@ pub struct CsvReadOptions<'a> {
     pub table_partition_cols: Vec<(String, DataType)>,
     /// File compression type
     pub file_compression_type: FileCompressionType,
-    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
-    pub infinite: bool,
     /// Indicates how the file is sorted
     pub file_sort_order: Vec<Vec<Expr>>,
 }
@@ -97,7 +94,6 @@ impl<'a> CsvReadOptions<'a> {
             file_extension: DEFAULT_CSV_EXTENSION,
             table_partition_cols: vec![],
             file_compression_type: FileCompressionType::UNCOMPRESSED,
-            infinite: false,
             file_sort_order: vec![],
         }
     }
@@ -108,12 +104,6 @@ impl<'a> CsvReadOptions<'a> {
         self
     }
 
-    /// Configure mark_infinite setting
-    pub fn mark_infinite(mut self, infinite: bool) -> Self {
-        self.infinite = infinite;
-        self
-    }
-
     /// Specify delimiter to use for CSV read
     pub fn delimiter(mut self, delimiter: u8) -> Self {
         self.delimiter = delimiter;
@@ -324,8 +314,6 @@ pub struct AvroReadOptions<'a> {
     pub file_extension: &'a str,
     /// Partition Columns
     pub table_partition_cols: Vec<(String, DataType)>,
-    /// Flag indicating whether this file may be unbounded (as in a FIFO file).
-    pub infinite: bool,
 }
 
 impl<'a> Default for AvroReadOptions<'a> {
@@ -334,7 +322,6 @@ impl<'a> Default for AvroReadOptions<'a> {
             schema: None,
             file_extension: DEFAULT_AVRO_EXTENSION,
             table_partition_cols: vec![],
-            infinite: false,
         }
     }
 }
@@ -349,12 +336,6 @@ impl<'a> AvroReadOptions<'a> {
         self
     }
 
-    /// Configure mark_infinite setting
-    pub fn mark_infinite(mut self, infinite: bool) -> Self {
-        self.infinite = infinite;
-        self
-    }
-
     /// Specify schema to use for AVRO read
     pub fn schema(mut self, schema: &'a Schema) -> Self {
         self.schema = Some(schema);
@@ -466,21 +447,17 @@ pub trait ReadOptions<'a> {
         state: SessionState,
         table_path: ListingTableUrl,
         schema: Option<&'a Schema>,
-        infinite: bool,
     ) -> Result<SchemaRef>
     where
         'a: 'async_trait,
     {
-        match (schema, infinite) {
-            (Some(s), _) => Ok(Arc::new(s.to_owned())),
-            (None, false) => Ok(self
-                .to_listing_options(config)
-                .infer_schema(&state, &table_path)
-                .await?),
-            (None, true) => {
-                plan_err!("Schema inference for infinite data sources is not supported.")
-            }
+        if let Some(s) = schema {
+            return Ok(Arc::new(s.to_owned()));
         }
+
+        self.to_listing_options(config)
+            .infer_schema(&state, &table_path)
+            .await
     }
 }
 
@@ -500,7 +477,6 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
             .with_file_sort_order(self.file_sort_order.clone())
-            .with_infinite_source(self.infinite)
     }
 
     async fn get_resolved_schema(
@@ -509,7 +485,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
         state: SessionState,
         table_path: ListingTableUrl,
     ) -> Result<SchemaRef> {
-        self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
+        self._get_resolved_schema(config, state, table_path, self.schema)
             .await
     }
 }
@@ -535,7 +511,7 @@ impl ReadOptions<'_> for ParquetReadOptions<'_> {
         state: SessionState,
         table_path: ListingTableUrl,
     ) -> Result<SchemaRef> {
-        self._get_resolved_schema(config, state, table_path, self.schema, false)
+        self._get_resolved_schema(config, state, table_path, self.schema)
             .await
     }
 }
@@ -551,7 +527,6 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
             .with_file_extension(self.file_extension)
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
-            .with_infinite_source(self.infinite)
             .with_file_sort_order(self.file_sort_order.clone())
     }
 
@@ -561,7 +536,7 @@ impl ReadOptions<'_> for NdJsonReadOptions<'_> {
         state: SessionState,
         table_path: ListingTableUrl,
     ) -> Result<SchemaRef> {
-        self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
+        self._get_resolved_schema(config, state, table_path, self.schema)
             .await
     }
 }
@@ -575,7 +550,6 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
             .with_file_extension(self.file_extension)
             .with_target_partitions(config.target_partitions())
             .with_table_partition_cols(self.table_partition_cols.clone())
-            .with_infinite_source(self.infinite)
     }
 
     async fn get_resolved_schema(
@@ -584,7 +558,7 @@ impl ReadOptions<'_> for AvroReadOptions<'_> {
         state: SessionState,
         table_path: ListingTableUrl,
     ) -> Result<SchemaRef> {
-        self._get_resolved_schema(config, state, table_path, self.schema, self.infinite)
+        self._get_resolved_schema(config, state, table_path, self.schema)
             .await
     }
 }
@@ -606,7 +580,7 @@ impl ReadOptions<'_> for ArrowReadOptions<'_> {
         state: SessionState,
         table_path: ListingTableUrl,
     ) -> Result<SchemaRef> {
-        self._get_resolved_schema(config, state, table_path, self.schema, false)
+        self._get_resolved_schema(config, state, table_path, self.schema)
             .await
     }
 }
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 0ce1b43fe4..4c13d9d443 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -246,11 +246,6 @@ pub struct ListingOptions {
     ///       multiple equivalent orderings, the outer `Vec` will have a
     ///       single element.
     pub file_sort_order: Vec<Vec<Expr>>,
-    /// Infinite source means that the input is not guaranteed to end.
-    /// Currently, CSV, JSON, and AVRO formats are supported.
-    /// In order to support infinite inputs, DataFusion may adjust query
-    /// plans (e.g. joins) to run the given query in full pipelining mode.
-    pub infinite_source: bool,
     /// This setting when true indicates that the table is backed by a single file.
     /// Any inserts to the table may only append to this existing file.
     pub single_file: bool,
@@ -274,30 +269,11 @@ impl ListingOptions {
             collect_stat: true,
             target_partitions: 1,
             file_sort_order: vec![],
-            infinite_source: false,
             single_file: false,
             file_type_write_options: None,
         }
     }
 
-    /// Set unbounded assumption on [`ListingOptions`] and returns self.
-    ///
-    /// ```
-    /// use std::sync::Arc;
-    /// use datafusion::datasource::{listing::ListingOptions, file_format::csv::CsvFormat};
-    /// use datafusion::prelude::SessionContext;
-    /// let ctx = SessionContext::new();
-    /// let listing_options = ListingOptions::new(Arc::new(
-    ///     CsvFormat::default()
-    ///   )).with_infinite_source(true);
-    ///
-    /// assert_eq!(listing_options.infinite_source, true);
-    /// ```
-    pub fn with_infinite_source(mut self, infinite_source: bool) -> Self {
-        self.infinite_source = infinite_source;
-        self
-    }
-
     /// Set file extension on [`ListingOptions`] and returns self.
     ///
     /// ```
@@ -557,7 +533,6 @@ pub struct ListingTable {
     options: ListingOptions,
     definition: Option<String>,
     collected_statistics: FileStatisticsCache,
-    infinite_source: bool,
     constraints: Constraints,
     column_defaults: HashMap<String, Expr>,
 }
@@ -587,7 +562,6 @@ impl ListingTable {
         for (part_col_name, part_col_type) in &options.table_partition_cols {
             builder.push(Field::new(part_col_name, part_col_type.clone(), false));
         }
-        let infinite_source = options.infinite_source;
 
         let table = Self {
             table_paths: config.table_paths,
@@ -596,7 +570,6 @@ impl ListingTable {
             options,
             definition: None,
             collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
-            infinite_source,
             constraints: Constraints::empty(),
             column_defaults: HashMap::new(),
         };
@@ -729,7 +702,6 @@ impl TableProvider for ListingTable {
                     limit,
                     output_ordering: self.try_create_output_ordering()?,
                     table_partition_cols,
-                    infinite_source: self.infinite_source,
                 },
                 filters.as_ref(),
             )
@@ -943,7 +915,6 @@ impl ListingTable {
 #[cfg(test)]
 mod tests {
     use std::collections::HashMap;
-    use std::fs::File;
 
     use super::*;
     #[cfg(feature = "parquet")]
@@ -955,7 +926,6 @@ mod tests {
     use crate::{
         assert_batches_eq,
         datasource::file_format::avro::AvroFormat,
-        execution::options::ReadOptions,
         logical_expr::{col, lit},
         test::{columns, object_store::register_test_store},
     };
@@ -967,37 +937,8 @@ mod tests {
     use datafusion_common::{assert_contains, GetExt, ScalarValue};
     use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
     use datafusion_physical_expr::PhysicalSortExpr;
-    use rstest::*;
     use tempfile::TempDir;
 
-    /// It creates dummy file and checks if it can create unbounded input executors.
-    async fn unbounded_table_helper(
-        file_type: FileType,
-        listing_option: ListingOptions,
-        infinite_data: bool,
-    ) -> Result<()> {
-        let ctx = SessionContext::new();
-        register_test_store(
-            &ctx,
-            &[(&format!("table/file{}", file_type.get_ext()), 100)],
-        );
-
-        let schema = Schema::new(vec![Field::new("a", DataType::Boolean, false)]);
-
-        let table_path = ListingTableUrl::parse("test:///table/").unwrap();
-        let config = ListingTableConfig::new(table_path)
-            .with_listing_options(listing_option)
-            .with_schema(Arc::new(schema));
-        // Create a table
-        let table = ListingTable::try_new(config)?;
-        // Create executor from table
-        let source_exec = table.scan(&ctx.state(), None, &[], None).await?;
-
-        assert_eq!(source_exec.unbounded_output(&[])?, infinite_data);
-
-        Ok(())
-    }
-
     #[tokio::test]
     async fn read_single_file() -> Result<()> {
         let ctx = SessionContext::new();
@@ -1205,99 +1146,6 @@ mod tests {
         Ok(())
     }
 
-    #[tokio::test]
-    async fn unbounded_csv_table_without_schema() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let file_path = tmp_dir.path().join("dummy.csv");
-        File::create(file_path)?;
-        let ctx = SessionContext::new();
-        let error = ctx
-            .register_csv(
-                "test",
-                tmp_dir.path().to_str().unwrap(),
-                CsvReadOptions::new().mark_infinite(true),
-            )
-            .await
-            .unwrap_err();
-        match error {
-            DataFusionError::Plan(_) => Ok(()),
-            val => Err(val),
-        }
-    }
-
-    #[tokio::test]
-    async fn unbounded_json_table_without_schema() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let file_path = tmp_dir.path().join("dummy.json");
-        File::create(file_path)?;
-        let ctx = SessionContext::new();
-        let error = ctx
-            .register_json(
-                "test",
-                tmp_dir.path().to_str().unwrap(),
-                NdJsonReadOptions::default().mark_infinite(true),
-            )
-            .await
-            .unwrap_err();
-        match error {
-            DataFusionError::Plan(_) => Ok(()),
-            val => Err(val),
-        }
-    }
-
-    #[tokio::test]
-    async fn unbounded_avro_table_without_schema() -> Result<()> {
-        let tmp_dir = TempDir::new()?;
-        let file_path = tmp_dir.path().join("dummy.avro");
-        File::create(file_path)?;
-        let ctx = SessionContext::new();
-        let error = ctx
-            .register_avro(
-                "test",
-                tmp_dir.path().to_str().unwrap(),
-                AvroReadOptions::default().mark_infinite(true),
-            )
-            .await
-            .unwrap_err();
-        match error {
-            DataFusionError::Plan(_) => Ok(()),
-            val => Err(val),
-        }
-    }
-
-    #[rstest]
-    #[tokio::test]
-    async fn unbounded_csv_table(
-        #[values(true, false)] infinite_data: bool,
-    ) -> Result<()> {
-        let config = CsvReadOptions::new().mark_infinite(infinite_data);
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let listing_options = config.to_listing_options(&session_config);
-        unbounded_table_helper(FileType::CSV, listing_options, infinite_data).await
-    }
-
-    #[rstest]
-    #[tokio::test]
-    async fn unbounded_json_table(
-        #[values(true, false)] infinite_data: bool,
-    ) -> Result<()> {
-        let config = NdJsonReadOptions::default().mark_infinite(infinite_data);
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let listing_options = config.to_listing_options(&session_config);
-        unbounded_table_helper(FileType::JSON, listing_options, infinite_data).await
-    }
-
-    #[rstest]
-    #[tokio::test]
-    async fn unbounded_avro_table(
-        #[values(true, false)] infinite_data: bool,
-    ) -> Result<()> {
-        let config = AvroReadOptions::default().mark_infinite(infinite_data);
-        let session_config = SessionConfig::new().with_target_partitions(1);
-        let listing_options = config.to_listing_options(&session_config);
-        unbounded_table_helper(FileType::AVRO, listing_options, infinite_data).await
-    }
-
     #[tokio::test]
     async fn test_assert_list_files_for_scan_grouping() -> Result<()> {
         // more expected partitions than files
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index a9d0c3a009..7c859ee988 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -133,21 +133,9 @@ impl TableProviderFactory for ListingTableFactory {
             (Some(schema), table_partition_cols)
         };
 
-        // look for 'infinite' as an option
-        let infinite_source = cmd.unbounded;
-
         let mut statement_options = StatementOptions::from(&cmd.options);
 
         // Extract ListingTable specific options if present or set default
-        let unbounded = if infinite_source {
-            statement_options.take_str_option("unbounded");
-            infinite_source
-        } else {
-            statement_options
-                .take_bool_option("unbounded")?
-                .unwrap_or(false)
-        };
-
         let single_file = statement_options
             .take_bool_option("single_file")?
             .unwrap_or(false);
@@ -159,6 +147,7 @@ impl TableProviderFactory for ListingTableFactory {
             }
         }
         statement_options.take_bool_option("create_local_path")?;
+        statement_options.take_str_option("unbounded");
 
         let file_type = file_format.file_type();
 
@@ -207,8 +196,7 @@ impl TableProviderFactory for ListingTableFactory {
             .with_table_partition_cols(table_partition_cols)
             .with_file_sort_order(cmd.order_exprs.clone())
             .with_single_file(single_file)
-            .with_write_options(file_type_writer_options)
-            .with_infinite_source(unbounded);
+            .with_write_options(file_type_writer_options);
 
         let resolved_schema = match provided_schema {
             None => options.infer_schema(state, &table_path).await?,
diff --git a/datafusion/core/src/datasource/physical_plan/arrow_file.rs b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
index 30b55db284..ae1e879d0d 100644
--- a/datafusion/core/src/datasource/physical_plan/arrow_file.rs
+++ b/datafusion/core/src/datasource/physical_plan/arrow_file.rs
@@ -93,10 +93,6 @@ impl ExecutionPlan for ArrowExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
-    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
-        Ok(self.base_config().infinite_source)
-    }
-
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.projected_output_ordering
             .first()
diff --git a/datafusion/core/src/datasource/physical_plan/avro.rs b/datafusion/core/src/datasource/physical_plan/avro.rs
index 885b4c5d39..e448bf39f4 100644
--- a/datafusion/core/src/datasource/physical_plan/avro.rs
+++ b/datafusion/core/src/datasource/physical_plan/avro.rs
@@ -89,10 +89,6 @@ impl ExecutionPlan for AvroExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
-    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
-        Ok(self.base_config().infinite_source)
-    }
-
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.projected_output_ordering
             .first()
@@ -276,7 +272,6 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
         let mut results = avro_exec
@@ -348,7 +343,6 @@ mod tests {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
@@ -419,7 +413,6 @@ mod tests {
             limit: None,
             table_partition_cols: vec![Field::new("date", DataType::Utf8, false)],
             output_ordering: vec![],
-            infinite_source: false,
         });
         assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
 
diff --git a/datafusion/core/src/datasource/physical_plan/csv.rs b/datafusion/core/src/datasource/physical_plan/csv.rs
index 0eca37da13..0c34d22e9f 100644
--- a/datafusion/core/src/datasource/physical_plan/csv.rs
+++ b/datafusion/core/src/datasource/physical_plan/csv.rs
@@ -146,10 +146,6 @@ impl ExecutionPlan for CsvExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
-    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
-        Ok(self.base_config().infinite_source)
-    }
-
     /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't be
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.projected_output_ordering
diff --git a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
index 89694ff285..516755e4d2 100644
--- a/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_scan_config.rs
@@ -99,8 +99,6 @@ pub struct FileScanConfig {
     pub table_partition_cols: Vec<Field>,
     /// All equivalent lexicographical orderings that describe the schema.
     pub output_ordering: Vec<LexOrdering>,
-    /// Indicates whether this plan may produce an infinite stream of records.
-    pub infinite_source: bool,
 }
 
 impl FileScanConfig {
@@ -707,7 +705,6 @@ mod tests {
             statistics,
             table_partition_cols,
             output_ordering: vec![],
-            infinite_source: false,
         }
     }
 
diff --git a/datafusion/core/src/datasource/physical_plan/file_stream.rs b/datafusion/core/src/datasource/physical_plan/file_stream.rs
index a715f6e8e3..99fb088b66 100644
--- a/datafusion/core/src/datasource/physical_plan/file_stream.rs
+++ b/datafusion/core/src/datasource/physical_plan/file_stream.rs
@@ -667,7 +667,6 @@ mod tests {
                 limit: self.limit,
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             };
             let metrics_set = ExecutionPlanMetricsSet::new();
             let file_stream = FileStream::new(&config, 0, self.opener, &metrics_set)
diff --git a/datafusion/core/src/datasource/physical_plan/json.rs b/datafusion/core/src/datasource/physical_plan/json.rs
index 9c3b523a65..c74fd13e77 100644
--- a/datafusion/core/src/datasource/physical_plan/json.rs
+++ b/datafusion/core/src/datasource/physical_plan/json.rs
@@ -110,10 +110,6 @@ impl ExecutionPlan for NdJsonExec {
         Partitioning::UnknownPartitioning(self.base_config.file_groups.len())
     }
 
-    fn unbounded_output(&self, _: &[bool]) -> Result<bool> {
-        Ok(self.base_config.infinite_source)
-    }
-
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
         self.projected_output_ordering
             .first()
@@ -462,7 +458,6 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
@@ -541,7 +536,6 @@ mod tests {
                 limit: Some(3),
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
@@ -589,7 +583,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
@@ -642,7 +635,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             file_compression_type.to_owned(),
         );
diff --git a/datafusion/core/src/datasource/physical_plan/mod.rs b/datafusion/core/src/datasource/physical_plan/mod.rs
index 8e4dd5400b..9d1c373aee 100644
--- a/datafusion/core/src/datasource/physical_plan/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/mod.rs
@@ -133,10 +133,6 @@ impl DisplayAs for FileScanConfig {
             write!(f, ", limit={limit}")?;
         }
 
-        if self.infinite_source {
-            write!(f, ", infinite_source=true")?;
-        }
-
         if let Some(ordering) = orderings.first() {
             if !ordering.is_empty() {
                 let start = if orderings.len() == 1 {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 2b10b05a27..ade149da69 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -882,7 +882,6 @@ mod tests {
                     limit: None,
                     table_partition_cols: vec![],
                     output_ordering: vec![],
-                    infinite_source: false,
                 },
                 predicate,
                 None,
@@ -1539,7 +1538,6 @@ mod tests {
                     limit: None,
                     table_partition_cols: vec![],
                     output_ordering: vec![],
-                    infinite_source: false,
                 },
                 None,
                 None,
@@ -1654,7 +1652,6 @@ mod tests {
                     ),
                 ],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             None,
             None,
@@ -1718,7 +1715,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             None,
             None,
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index 58a4f08341..8916fa814a 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -964,14 +964,9 @@ impl SessionContext {
         sql_definition: Option<String>,
     ) -> Result<()> {
         let table_path = ListingTableUrl::parse(table_path)?;
-        let resolved_schema = match (provided_schema, options.infinite_source) {
-            (Some(s), _) => s,
-            (None, false) => options.infer_schema(&self.state(), &table_path).await?,
-            (None, true) => {
-                return plan_err!(
-                    "Schema inference for infinite data sources is not supported."
-                )
-            }
+        let resolved_schema = match provided_schema {
+            Some(s) => s,
+            None => options.infer_schema(&self.state(), &table_path).await?,
         };
         let config = ListingTableConfig::new(table_path)
             .with_listing_options(options)
diff --git a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
index c50ea36b68..7359a64630 100644
--- a/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
+++ b/datafusion/core/src/physical_optimizer/combine_partial_final_agg.rs
@@ -257,7 +257,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![],
-                infinite_source: false,
             },
             None,
             None,
diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 099759741a..0aef126578 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -1775,7 +1775,6 @@ pub(crate) mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering,
-                infinite_source: false,
             },
             None,
             None,
@@ -1803,7 +1802,6 @@ pub(crate) mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering,
-                infinite_source: false,
             },
             None,
             None,
@@ -1825,7 +1823,6 @@ pub(crate) mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering,
-                infinite_source: false,
             },
             false,
             b',',
@@ -1856,7 +1853,6 @@ pub(crate) mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering,
-                infinite_source: false,
             },
             false,
             b',',
@@ -3957,7 +3953,6 @@ pub(crate) mod tests {
                         limit: None,
                         table_partition_cols: vec![],
                         output_ordering: vec![],
-                        infinite_source: false,
                     },
                     false,
                     b',',
diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
index 277404b301..c0e9b834e6 100644
--- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs
@@ -2117,7 +2117,7 @@ mod tests {
     async fn test_with_lost_ordering_bounded() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, false);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2141,10 +2141,11 @@ mod tests {
     }
 
     #[tokio::test]
+    #[ignore]
     async fn test_with_lost_ordering_unbounded() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2171,10 +2172,12 @@ mod tests {
     }
 
     #[tokio::test]
+    #[ignore]
     async fn test_with_lost_ordering_unbounded_parallelize_off() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        // Make source unbounded
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec(source);
         let repartition_hash = Arc::new(RepartitionExec::try_new(
             repartition_rr,
@@ -2203,7 +2206,7 @@ mod tests {
     async fn test_do_not_pushdown_through_spm() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs.clone(), false);
+        let source = csv_exec_sorted(&schema, sort_exprs.clone());
         let repartition_rr = repartition_exec(source);
         let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr);
         let physical_plan = sort_exec(vec![sort_expr("b", &schema)], spm);
@@ -2224,7 +2227,7 @@ mod tests {
     async fn test_pushdown_through_spm() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs.clone(), false);
+        let source = csv_exec_sorted(&schema, sort_exprs.clone());
         let repartition_rr = repartition_exec(source);
         let spm = sort_preserving_merge_exec(sort_exprs, repartition_rr);
         let physical_plan = sort_exec(
@@ -2252,7 +2255,7 @@ mod tests {
     async fn test_window_multi_layer_requirement() -> Result<()> {
         let schema = create_test_schema3()?;
         let sort_exprs = vec![sort_expr("a", &schema), sort_expr("b", &schema)];
-        let source = csv_exec_sorted(&schema, vec![], false);
+        let source = csv_exec_sorted(&schema, vec![]);
         let sort = sort_exec(sort_exprs.clone(), source);
         let repartition = repartition_exec(sort);
         let repartition = spr_repartition_exec(repartition);
diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 664afbe822..7e1312dad2 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -1541,7 +1541,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![vec![]],
-                infinite_source: false,
             },
             false,
             0,
@@ -1568,7 +1567,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![vec![]],
-                infinite_source: false,
             },
             false,
             0,
diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index af45df7d84..41f2b39978 100644
--- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -350,7 +350,7 @@ mod tests {
     async fn test_replace_multiple_input_repartition_1() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
 
@@ -362,15 +362,15 @@ mod tests {
             "  SortExec: expr=[a@0 ASC NULLS LAST]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -378,7 +378,7 @@ mod tests {
     async fn test_with_inter_children_change_only() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr_default("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -408,7 +408,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
         ];
 
         let expected_optimized = [
@@ -419,9 +419,9 @@ mod tests {
             "        SortPreservingMergeExec: [a@0 ASC]",
             "          RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC], has_header=true",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -429,7 +429,7 @@ mod tests {
     async fn test_replace_multiple_input_repartition_2() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let filter = filter_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(filter);
@@ -444,16 +444,16 @@ mod tests {
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -461,7 +461,7 @@ mod tests {
     async fn test_replace_multiple_input_repartition_with_extra_steps() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -478,7 +478,7 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -486,9 +486,9 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -496,7 +496,7 @@ mod tests {
     async fn test_replace_multiple_input_repartition_with_extra_steps_2() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
         let repartition_hash = repartition_exec_hash(coalesce_batches_exec_1);
@@ -516,7 +516,7 @@ mod tests {
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -525,9 +525,9 @@ mod tests {
             "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -535,7 +535,7 @@ mod tests {
     async fn test_not_replacing_when_no_need_to_preserve_sorting() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -550,7 +550,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "CoalescePartitionsExec",
@@ -558,7 +558,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -568,7 +568,7 @@ mod tests {
     async fn test_with_multiple_replacable_repartitions() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let filter = filter_exec(repartition_hash);
@@ -587,7 +587,7 @@ mod tests {
             "        FilterExec: c@1 > 3",
             "          RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -596,9 +596,9 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -606,7 +606,7 @@ mod tests {
     async fn test_not_replace_with_different_orderings() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let sort = sort_exec(
@@ -625,14 +625,14 @@ mod tests {
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [c@1 ASC]",
             "  SortExec: expr=[c@1 ASC]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -642,7 +642,7 @@ mod tests {
     async fn test_with_lost_ordering() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -654,15 +654,15 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         let expected_optimized = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -670,7 +670,7 @@ mod tests {
     async fn test_with_lost_and_kept_ordering() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, true);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
         let coalesce_partitions = coalesce_partitions_exec(repartition_hash);
@@ -700,7 +700,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                  CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
 
         let expected_optimized = [
@@ -712,9 +712,9 @@ mod tests {
             "          CoalescePartitionsExec",
             "            RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "                CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
-        assert_optimized!(expected_input, expected_optimized, physical_plan);
+        assert_optimized!(expected_input, expected_optimized, physical_plan, true);
         Ok(())
     }
 
@@ -723,14 +723,14 @@ mod tests {
         let schema = create_test_schema()?;
 
         let left_sort_exprs = vec![sort_expr("a", &schema)];
-        let left_source = csv_exec_sorted(&schema, left_sort_exprs, true);
+        let left_source = csv_exec_sorted(&schema, left_sort_exprs);
         let left_repartition_rr = repartition_exec_round_robin(left_source);
         let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
         let left_coalesce_partitions =
             Arc::new(CoalesceBatchesExec::new(left_repartition_hash, 4096));
 
         let right_sort_exprs = vec![sort_expr("a", &schema)];
-        let right_source = csv_exec_sorted(&schema, right_sort_exprs, true);
+        let right_source = csv_exec_sorted(&schema, right_sort_exprs);
         let right_repartition_rr = repartition_exec_round_robin(right_source);
         let right_repartition_hash = repartition_exec_hash(right_repartition_rr);
         let right_coalesce_partitions =
@@ -756,11 +756,11 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
 
         let expected_optimized = [
@@ -770,11 +770,11 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            CsvExec: file_groups={1 group: [[file_path]]}, projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
         ];
         assert_optimized!(expected_input, expected_optimized, physical_plan);
         Ok(())
@@ -784,7 +784,7 @@ mod tests {
     async fn test_with_bounded_input() -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
-        let source = csv_exec_sorted(&schema, sort_exprs, false);
+        let source = csv_exec_sorted(&schema, sort_exprs);
         let repartition = repartition_exec_hash(repartition_exec_round_robin(source));
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
 
@@ -931,7 +931,6 @@ mod tests {
     fn csv_exec_sorted(
         schema: &SchemaRef,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
-        infinite_source: bool,
     ) -> Arc<dyn ExecutionPlan> {
         let sort_exprs = sort_exprs.into_iter().collect();
         let projection: Vec<usize> = vec![0, 2, 3];
@@ -949,7 +948,6 @@ mod tests {
                 limit: None,
                 table_partition_cols: vec![],
                 output_ordering: vec![sort_exprs],
-                infinite_source,
             },
             true,
             0,
diff --git a/datafusion/core/src/physical_optimizer/test_utils.rs b/datafusion/core/src/physical_optimizer/test_utils.rs
index 678dc1f373..6e14cca21f 100644
--- a/datafusion/core/src/physical_optimizer/test_utils.rs
+++ b/datafusion/core/src/physical_optimizer/test_utils.rs
@@ -45,6 +45,7 @@ use datafusion_expr::{AggregateFunction, WindowFrame, WindowFunction};
 use datafusion_physical_expr::expressions::col;
 use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
 
+use crate::datasource::stream::{StreamConfig, StreamTable};
 use async_trait::async_trait;
 
 async fn register_current_csv(
@@ -54,14 +55,19 @@ async fn register_current_csv(
 ) -> Result<()> {
     let testdata = crate::test_util::arrow_test_data();
     let schema = crate::test_util::aggr_test_schema();
-    ctx.register_csv(
-        table_name,
-        &format!("{testdata}/csv/aggregate_test_100.csv"),
-        CsvReadOptions::new()
-            .schema(&schema)
-            .mark_infinite(infinite),
-    )
-    .await?;
+    let path = format!("{testdata}/csv/aggregate_test_100.csv");
+
+    match infinite {
+        true => {
+            let config = StreamConfig::new_file(schema, path.into());
+            ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
+        }
+        false => {
+            ctx.register_csv(table_name, &path, CsvReadOptions::new().schema(&schema))
+                .await?;
+        }
+    }
+
     Ok(())
 }
 
@@ -272,7 +278,6 @@ pub fn parquet_exec(schema: &SchemaRef) -> Arc<ParquetExec> {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         },
         None,
         None,
@@ -296,7 +301,6 @@ pub fn parquet_exec_sorted(
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![sort_exprs],
-            infinite_source: false,
         },
         None,
         None,
diff --git a/datafusion/core/src/test/mod.rs b/datafusion/core/src/test/mod.rs
index aad5c19044..8770c0c423 100644
--- a/datafusion/core/src/test/mod.rs
+++ b/datafusion/core/src/test/mod.rs
@@ -203,7 +203,6 @@ pub fn partitioned_csv_config(
         limit: None,
         table_partition_cols: vec![],
         output_ordering: vec![],
-        infinite_source: false,
     })
 }
 
@@ -277,7 +276,6 @@ fn make_decimal() -> RecordBatch {
 pub fn csv_exec_sorted(
     schema: &SchemaRef,
     sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
-    infinite_source: bool,
 ) -> Arc<dyn ExecutionPlan> {
     let sort_exprs = sort_exprs.into_iter().collect();
 
@@ -291,7 +289,6 @@ pub fn csv_exec_sorted(
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![sort_exprs],
-            infinite_source,
         },
         false,
         0,
diff --git a/datafusion/core/src/test_util/mod.rs b/datafusion/core/src/test_util/mod.rs
index c6b43de0c1..282b0f7079 100644
--- a/datafusion/core/src/test_util/mod.rs
+++ b/datafusion/core/src/test_util/mod.rs
@@ -36,7 +36,6 @@ use crate::datasource::provider::TableProviderFactory;
 use crate::datasource::{empty::EmptyTable, provider_as_source, TableProvider};
 use crate::error::Result;
 use crate::execution::context::{SessionState, TaskContext};
-use crate::execution::options::ReadOptions;
 use crate::logical_expr::{LogicalPlanBuilder, UNNAMED_TABLE};
 use crate::physical_plan::{
     DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, RecordBatchStream,
@@ -58,6 +57,7 @@ use futures::Stream;
 pub use datafusion_common::test_util::parquet_test_data;
 pub use datafusion_common::test_util::{arrow_test_data, get_data_dir};
 
+use crate::datasource::stream::{StreamConfig, StreamTable};
 pub use datafusion_common::{assert_batches_eq, assert_batches_sorted_eq};
 
 /// Scan an empty data source, mainly used in tests
@@ -342,30 +342,17 @@ impl RecordBatchStream for UnboundedStream {
 }
 
 /// This function creates an unbounded sorted file for testing purposes.
-pub async fn register_unbounded_file_with_ordering(
+pub fn register_unbounded_file_with_ordering(
     ctx: &SessionContext,
     schema: SchemaRef,
     file_path: &Path,
     table_name: &str,
     file_sort_order: Vec<Vec<Expr>>,
-    with_unbounded_execution: bool,
 ) -> Result<()> {
-    // Mark infinite and provide schema:
-    let fifo_options = CsvReadOptions::new()
-        .schema(schema.as_ref())
-        .mark_infinite(with_unbounded_execution);
-    // Get listing options:
-    let options_sort = fifo_options
-        .to_listing_options(&ctx.copied_config())
-        .with_file_sort_order(file_sort_order);
+    let config =
+        StreamConfig::new_file(schema, file_path.into()).with_order(file_sort_order);
+
     // Register table:
-    ctx.register_listing_table(
-        table_name,
-        file_path.as_os_str().to_str().unwrap(),
-        options_sort,
-        Some(schema),
-        None,
-    )
-    .await?;
+    ctx.register_table(table_name, Arc::new(StreamTable::new(Arc::new(config))))?;
     Ok(())
 }
diff --git a/datafusion/core/src/test_util/parquet.rs b/datafusion/core/src/test_util/parquet.rs
index f3c0d2987a..336a680463 100644
--- a/datafusion/core/src/test_util/parquet.rs
+++ b/datafusion/core/src/test_util/parquet.rs
@@ -156,7 +156,6 @@ impl TestParquetFile {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         };
 
         let df_schema = self.schema.clone().to_dfschema_ref()?;
diff --git a/datafusion/core/tests/parquet/custom_reader.rs b/datafusion/core/tests/parquet/custom_reader.rs
index 3752d42dbf..e76b201e02 100644
--- a/datafusion/core/tests/parquet/custom_reader.rs
+++ b/datafusion/core/tests/parquet/custom_reader.rs
@@ -85,7 +85,6 @@ async fn route_data_access_ops_to_parquet_file_reader_factory() {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         },
         None,
         None,
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index e1e8b8e66e..23a56bc821 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -81,7 +81,6 @@ async fn get_parquet_exec(state: &SessionState, filter: Expr) -> ParquetExec {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         },
         Some(predicate),
         None,
diff --git a/datafusion/core/tests/parquet/schema_coercion.rs b/datafusion/core/tests/parquet/schema_coercion.rs
index 25c62f18f5..00f3eada49 100644
--- a/datafusion/core/tests/parquet/schema_coercion.rs
+++ b/datafusion/core/tests/parquet/schema_coercion.rs
@@ -69,7 +69,6 @@ async fn multi_parquet_coercion() {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         },
         None,
         None,
@@ -133,7 +132,6 @@ async fn multi_parquet_coercion_projection() {
             limit: None,
             table_partition_cols: vec![],
             output_ordering: vec![],
-            infinite_source: false,
         },
         None,
         None,
diff --git a/datafusion/core/tests/sql/joins.rs b/datafusion/core/tests/sql/joins.rs
index 528bde6323..d1f270b540 100644
--- a/datafusion/core/tests/sql/joins.rs
+++ b/datafusion/core/tests/sql/joins.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use datafusion::datasource::stream::{StreamConfig, StreamTable};
 use datafusion::test_util::register_unbounded_file_with_ordering;
 
 use super::*;
@@ -105,9 +106,7 @@ async fn join_change_in_planner() -> Result<()> {
         &left_file_path,
         "left",
         file_sort_order.clone(),
-        true,
-    )
-    .await?;
+    )?;
     let right_file_path = tmp_dir.path().join("right.csv");
     File::create(right_file_path.clone()).unwrap();
     register_unbounded_file_with_ordering(
@@ -116,9 +115,7 @@ async fn join_change_in_planner() -> Result<()> {
         &right_file_path,
         "right",
         file_sort_order,
-        true,
-    )
-    .await?;
+    )?;
     let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
     let dataframe = ctx.sql(sql).await?;
     let physical_plan = dataframe.create_physical_plan().await?;
@@ -160,20 +157,13 @@ async fn join_change_in_planner_without_sort() -> Result<()> {
         Field::new("a1", DataType::UInt32, false),
         Field::new("a2", DataType::UInt32, false),
     ]));
-    ctx.register_csv(
-        "left",
-        left_file_path.as_os_str().to_str().unwrap(),
-        CsvReadOptions::new().schema(&schema).mark_infinite(true),
-    )
-    .await?;
+    let left = StreamConfig::new_file(schema.clone(), left_file_path);
+    ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
+
     let right_file_path = tmp_dir.path().join("right.csv");
     File::create(right_file_path.clone())?;
-    ctx.register_csv(
-        "right",
-        right_file_path.as_os_str().to_str().unwrap(),
-        CsvReadOptions::new().schema(&schema).mark_infinite(true),
-    )
-    .await?;
+    let right = StreamConfig::new_file(schema, right_file_path);
+    ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
     let sql = "SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10";
     let dataframe = ctx.sql(sql).await?;
     let physical_plan = dataframe.create_physical_plan().await?;
@@ -217,20 +207,12 @@ async fn join_change_in_planner_without_sort_not_allowed() -> Result<()> {
         Field::new("a1", DataType::UInt32, false),
         Field::new("a2", DataType::UInt32, false),
     ]));
-    ctx.register_csv(
-        "left",
-        left_file_path.as_os_str().to_str().unwrap(),
-        CsvReadOptions::new().schema(&schema).mark_infinite(true),
-    )
-    .await?;
+    let left = StreamConfig::new_file(schema.clone(), left_file_path);
+    ctx.register_table("left", Arc::new(StreamTable::new(Arc::new(left))))?;
     let right_file_path = tmp_dir.path().join("right.csv");
     File::create(right_file_path.clone())?;
-    ctx.register_csv(
-        "right",
-        right_file_path.as_os_str().to_str().unwrap(),
-        CsvReadOptions::new().schema(&schema).mark_infinite(true),
-    )
-    .await?;
+    let right = StreamConfig::new_file(schema.clone(), right_file_path);
+    ctx.register_table("right", Arc::new(StreamTable::new(Arc::new(right))))?;
     let df = ctx.sql("SELECT t1.a1, t1.a2, t2.a1, t2.a2 FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a1 > t2.a1 + 3 AND t1.a1 < t2.a1 + 10").await?;
     match df.create_physical_plan().await {
         Ok(_) => panic!("Expecting error."),
diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs
index dcebfbf2da..5c0ef615ca 100644
--- a/datafusion/proto/src/physical_plan/from_proto.rs
+++ b/datafusion/proto/src/physical_plan/from_proto.rs
@@ -526,7 +526,6 @@ pub fn parse_protobuf_file_scan_config(
         limit: proto.limit.as_ref().map(|sl| sl.limit as usize),
         table_partition_cols,
         output_ordering,
-        infinite_source: false,
     })
 }
 
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 4a512413e7..9a9827f2a0 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -492,7 +492,6 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> Result<()> {
         limit: None,
         table_partition_cols: vec![],
         output_ordering: vec![],
-        infinite_source: false,
     };
 
     let predicate = Arc::new(BinaryExpr::new(
diff --git a/datafusion/substrait/src/physical_plan/consumer.rs b/datafusion/substrait/src/physical_plan/consumer.rs
index 942798173e..3098dc386e 100644
--- a/datafusion/substrait/src/physical_plan/consumer.rs
+++ b/datafusion/substrait/src/physical_plan/consumer.rs
@@ -112,7 +112,6 @@ pub async fn from_substrait_rel(
                         limit: None,
                         table_partition_cols: vec![],
                         output_ordering: vec![],
-                        infinite_source: false,
                     };
 
                     if let Some(MaskExpression { select, .. }) = &read.projection {
diff --git a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
index b64dd2c138..e5af3f94cc 100644
--- a/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_physical_plan.rs
@@ -49,7 +49,6 @@ async fn parquet_exec() -> Result<()> {
         limit: None,
         table_partition_cols: vec![],
         output_ordering: vec![],
-        infinite_source: false,
     };
     let parquet_exec: Arc<dyn ExecutionPlan> =
         Arc::new(ParquetExec::new(scan_config, None, None));