You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/11/15 12:48:33 UTC

(arrow-datafusion) branch main updated: Implement StreamTable and StreamTableProvider (#7994) (#8021)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 020b8fc761 Implement StreamTable and StreamTableProvider (#7994) (#8021)
020b8fc761 is described below

commit 020b8fc7619cfa392638da76456331b034479874
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Nov 15 12:48:27 2023 +0000

    Implement StreamTable and StreamTableProvider (#7994) (#8021)
    
    * Implement FIFO using extension points (#7994)
    
    * Clippy
    
    * Rename to StreamTable and make public
    
    * Add StreamEncoding
    
    * Rework sort order
    
    * Fix logical conflicts
    
    * Format
    
    * Add DefaultTableProvider
    
    * Fix doc
    
    * Fix project sort keys and CSV headers
    
    * Respect batch size on read
    
    * Tests are updated
    
    * Resolving clippy
    
    ---------
    
    Co-authored-by: metesynnada <10...@users.noreply.github.com>
---
 datafusion/core/src/datasource/listing/table.rs    |  37 +--
 .../core/src/datasource/listing_table_factory.rs   |   9 +-
 datafusion/core/src/datasource/mod.rs              |  44 +++
 datafusion/core/src/datasource/provider.rs         |  40 +++
 datafusion/core/src/datasource/stream.rs           | 326 +++++++++++++++++++++
 datafusion/core/src/execution/context/mod.rs       |  14 +-
 datafusion/core/tests/fifo.rs                      | 226 +++++++-------
 datafusion/physical-plan/src/streaming.rs          |  21 +-
 datafusion/sqllogictest/test_files/ddl.slt         |   2 +-
 datafusion/sqllogictest/test_files/groupby.slt     |  10 +-
 datafusion/sqllogictest/test_files/window.slt      |  14 +-
 11 files changed, 553 insertions(+), 190 deletions(-)

diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index d26d417bd8..c22eb58e88 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -26,6 +26,7 @@ use super::PartitionedFile;
 #[cfg(feature = "parquet")]
 use crate::datasource::file_format::parquet::ParquetFormat;
 use crate::datasource::{
+    create_ordering,
     file_format::{
         arrow::ArrowFormat,
         avro::AvroFormat,
@@ -40,7 +41,6 @@ use crate::datasource::{
     TableProvider, TableType,
 };
 use crate::logical_expr::TableProviderFilterPushDown;
-use crate::physical_plan;
 use crate::{
     error::{DataFusionError, Result},
     execution::context::SessionState,
@@ -48,7 +48,6 @@ use crate::{
     physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
 };
 
-use arrow::compute::SortOptions;
 use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
 use arrow_schema::Schema;
 use datafusion_common::{
@@ -57,10 +56,9 @@ use datafusion_common::{
 };
 use datafusion_execution::cache::cache_manager::FileStatisticsCache;
 use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
-use datafusion_expr::expr::Sort;
 use datafusion_optimizer::utils::conjunction;
 use datafusion_physical_expr::{
-    create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
+    create_physical_expr, LexOrdering, PhysicalSortRequirement,
 };
 
 use async_trait::async_trait;
@@ -677,34 +675,7 @@ impl ListingTable {
 
     /// If file_sort_order is specified, creates the appropriate physical expressions
     fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
-        let mut all_sort_orders = vec![];
-
-        for exprs in &self.options.file_sort_order {
-            // Construct PhsyicalSortExpr objects from Expr objects:
-            let sort_exprs = exprs
-                .iter()
-                .map(|expr| {
-                    if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
-                        if let Expr::Column(col) = expr.as_ref() {
-                            let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
-                            Ok(PhysicalSortExpr {
-                                expr,
-                                options: SortOptions {
-                                    descending: !asc,
-                                    nulls_first: *nulls_first,
-                                },
-                            })
-                        } else {
-                            plan_err!("Expected single column references in output_ordering, got {expr}")
-                        }
-                    } else {
-                        plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
-                    }
-                })
-                .collect::<Result<Vec<_>>>()?;
-            all_sort_orders.push(sort_exprs);
-        }
-        Ok(all_sort_orders)
+        create_ordering(&self.table_schema, &self.options.file_sort_order)
     }
 }
 
@@ -1040,9 +1011,11 @@ mod tests {
 
     use arrow::datatypes::{DataType, Schema};
     use arrow::record_batch::RecordBatch;
+    use arrow_schema::SortOptions;
     use datafusion_common::stats::Precision;
     use datafusion_common::{assert_contains, GetExt, ScalarValue};
     use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+    use datafusion_physical_expr::PhysicalSortExpr;
     use rstest::*;
     use tempfile::TempDir;
 
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 26f4051897..f9a7ab04ce 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable;
 use async_trait::async_trait;
 
 /// A `TableProviderFactory` capable of creating new `ListingTable`s
+#[derive(Debug, Default)]
 pub struct ListingTableFactory {}
 
 impl ListingTableFactory {
     /// Creates a new `ListingTableFactory`
     pub fn new() -> Self {
-        Self {}
-    }
-}
-
-impl Default for ListingTableFactory {
-    fn default() -> Self {
-        Self::new()
+        Self::default()
     }
 }
 
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 48e9d69921..45f9bee6a5 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -29,6 +29,7 @@ pub mod memory;
 pub mod physical_plan;
 pub mod provider;
 mod statistics;
+pub mod stream;
 pub mod streaming;
 pub mod view;
 
@@ -43,3 +44,46 @@ pub use self::provider::TableProvider;
 pub use self::view::ViewTable;
 pub use crate::logical_expr::TableType;
 pub use statistics::get_statistics_with_limit;
+
+use arrow_schema::{Schema, SortOptions};
+use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_expr::Expr;
+use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
+
+fn create_ordering(
+    schema: &Schema,
+    sort_order: &[Vec<Expr>],
+) -> Result<Vec<LexOrdering>> {
+    let mut all_sort_orders = vec![];
+
+    for exprs in sort_order {
+        // Construct PhysicalSortExpr objects from Expr objects:
+        let mut sort_exprs = vec![];
+        for expr in exprs {
+            match expr {
+                Expr::Sort(sort) => match sort.expr.as_ref() {
+                    Expr::Column(col) => match expressions::col(&col.name, schema) {
+                        Ok(expr) => {
+                            sort_exprs.push(PhysicalSortExpr {
+                                expr,
+                                options: SortOptions {
+                                    descending: !sort.asc,
+                                    nulls_first: sort.nulls_first,
+                                },
+                            });
+                        }
+                        // Cannot find expression in the projected_schema, stop iterating
+                        // since rest of the orderings are violated
+                        Err(_) => break,
+                    }
+                    expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
+                }
+                expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
+            }
+        }
+        if !sort_exprs.is_empty() {
+            all_sort_orders.push(sort_exprs);
+        }
+    }
+    Ok(all_sort_orders)
+}
diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs
index 7d9f9e86d6..4fe433044e 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan};
 pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
 
 use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::listing_table_factory::ListingTableFactory;
+use crate::datasource::stream::StreamTableFactory;
 use crate::error::Result;
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
@@ -214,3 +216,41 @@ pub trait TableProviderFactory: Sync + Send {
         cmd: &CreateExternalTable,
     ) -> Result<Arc<dyn TableProvider>>;
 }
+
+/// The default [`TableProviderFactory`]
+///
+/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`],
+/// otherwise calls [`ListingTableFactory::create`]
+#[derive(Debug, Default)]
+pub struct DefaultTableFactory {
+    stream: StreamTableFactory,
+    listing: ListingTableFactory,
+}
+
+impl DefaultTableFactory {
+    /// Creates a new [`DefaultTableFactory`]
+    pub fn new() -> Self {
+        Self::default()
+    }
+}
+
+#[async_trait]
+impl TableProviderFactory for DefaultTableFactory {
+    async fn create(
+        &self,
+        state: &SessionState,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let mut unbounded = cmd.unbounded;
+        for (k, v) in &cmd.options {
+            if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
+                unbounded = true
+            }
+        }
+
+        match unbounded {
+            true => self.stream.create(state, cmd).await,
+            false => self.listing.create(state, cmd).await,
+        }
+    }
+}
diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs
new file mode 100644
index 0000000000..cf95dd249a
--- /dev/null
+++ b/datafusion/core/src/datasource/stream.rs
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TableProvider for stream sources, such as FIFO files
+
+use std::any::Any;
+use std::fmt::Formatter;
+use std::fs::{File, OpenOptions};
+use std::io::BufReader;
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use futures::StreamExt;
+use tokio::task::spawn_blocking;
+
+use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_plan::common::AbortOnDropSingle;
+use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
+use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+use crate::datasource::provider::TableProviderFactory;
+use crate::datasource::{create_ordering, TableProvider};
+use crate::execution::context::SessionState;
+
+/// A [`TableProviderFactory`] for [`StreamTable`]
+#[derive(Debug, Default)]
+pub struct StreamTableFactory {}
+
+#[async_trait]
+impl TableProviderFactory for StreamTableFactory {
+    async fn create(
+        &self,
+        state: &SessionState,
+        cmd: &CreateExternalTable,
+    ) -> Result<Arc<dyn TableProvider>> {
+        let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
+        let location = cmd.location.clone();
+        let encoding = cmd.file_type.parse()?;
+
+        let config = StreamConfig::new_file(schema, location.into())
+            .with_encoding(encoding)
+            .with_order(cmd.order_exprs.clone())
+            .with_header(cmd.has_header)
+            .with_batch_size(state.config().batch_size());
+
+        Ok(Arc::new(StreamTable(Arc::new(config))))
+    }
+}
+
+/// The data encoding for [`StreamTable`]
+#[derive(Debug, Clone)]
+pub enum StreamEncoding {
+    /// CSV records
+    Csv,
+    /// Newline-delimited JSON records
+    Json,
+}
+
+impl FromStr for StreamEncoding {
+    type Err = DataFusionError;
+
+    fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+        match s.to_ascii_lowercase().as_str() {
+            "csv" => Ok(Self::Csv),
+            "json" => Ok(Self::Json),
+            _ => plan_err!("Unrecognised StreamEncoding {}", s),
+        }
+    }
+}
+
+/// The configuration for a [`StreamTable`]
+#[derive(Debug)]
+pub struct StreamConfig {
+    schema: SchemaRef,
+    location: PathBuf,
+    batch_size: usize,
+    encoding: StreamEncoding,
+    header: bool,
+    order: Vec<Vec<Expr>>,
+}
+
+impl StreamConfig {
+    /// Stream data from the file at `location`
+    pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
+        Self {
+            schema,
+            location,
+            batch_size: 1024,
+            encoding: StreamEncoding::Csv,
+            order: vec![],
+            header: false,
+        }
+    }
+
+    /// Specify a sort order for the stream
+    pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
+        self.order = order;
+        self
+    }
+
+    /// Specify the batch size
+    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+        self.batch_size = batch_size;
+        self
+    }
+
+    /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`])
+    pub fn with_header(mut self, header: bool) -> Self {
+        self.header = header;
+        self
+    }
+
+    /// Specify an encoding for the stream
+    pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
+        self.encoding = encoding;
+        self
+    }
+
+    fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
+        let file = File::open(&self.location)?;
+        let schema = self.schema.clone();
+        match &self.encoding {
+            StreamEncoding::Csv => {
+                let reader = arrow::csv::ReaderBuilder::new(schema)
+                    .with_header(self.header)
+                    .with_batch_size(self.batch_size)
+                    .build(file)?;
+
+                Ok(Box::new(reader))
+            }
+            StreamEncoding::Json => {
+                let reader = arrow::json::ReaderBuilder::new(schema)
+                    .with_batch_size(self.batch_size)
+                    .build(BufReader::new(file))?;
+
+                Ok(Box::new(reader))
+            }
+        }
+    }
+
+    fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+        match &self.encoding {
+            StreamEncoding::Csv => {
+                let header = self.header && !self.location.exists();
+                let file = OpenOptions::new().write(true).open(&self.location)?;
+                let writer = arrow::csv::WriterBuilder::new()
+                    .with_header(header)
+                    .build(file);
+
+                Ok(Box::new(writer))
+            }
+            StreamEncoding::Json => {
+                let file = OpenOptions::new().write(true).open(&self.location)?;
+                Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
+            }
+        }
+    }
+}
+
+/// A [`TableProvider`] for a stream source, such as a FIFO file
+pub struct StreamTable(Arc<StreamConfig>);
+
+impl StreamTable {
+    /// Create a new [`StreamTable`] for the given `StreamConfig`
+    pub fn new(config: Arc<StreamConfig>) -> Self {
+        Self(config)
+    }
+}
+
+#[async_trait]
+impl TableProvider for StreamTable {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn schema(&self) -> SchemaRef {
+        self.0.schema.clone()
+    }
+
+    fn table_type(&self) -> TableType {
+        TableType::Base
+    }
+
+    async fn scan(
+        &self,
+        _state: &SessionState,
+        projection: Option<&Vec<usize>>,
+        _filters: &[Expr],
+        _limit: Option<usize>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let projected_schema = match projection {
+            Some(p) => {
+                let projected = self.0.schema.project(p)?;
+                create_ordering(&projected, &self.0.order)?
+            }
+            None => create_ordering(self.0.schema.as_ref(), &self.0.order)?,
+        };
+
+        Ok(Arc::new(StreamingTableExec::try_new(
+            self.0.schema.clone(),
+            vec![Arc::new(StreamRead(self.0.clone())) as _],
+            projection,
+            projected_schema,
+            true,
+        )?))
+    }
+
+    async fn insert_into(
+        &self,
+        _state: &SessionState,
+        input: Arc<dyn ExecutionPlan>,
+        _overwrite: bool,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let ordering = match self.0.order.first() {
+            Some(x) => {
+                let schema = self.0.schema.as_ref();
+                let orders = create_ordering(schema, std::slice::from_ref(x))?;
+                let ordering = orders.into_iter().next().unwrap();
+                Some(ordering.into_iter().map(Into::into).collect())
+            }
+            None => None,
+        };
+
+        Ok(Arc::new(FileSinkExec::new(
+            input,
+            Arc::new(StreamWrite(self.0.clone())),
+            self.0.schema.clone(),
+            ordering,
+        )))
+    }
+}
+
+struct StreamRead(Arc<StreamConfig>);
+
+impl PartitionStream for StreamRead {
+    fn schema(&self) -> &SchemaRef {
+        &self.0.schema
+    }
+
+    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+        let config = self.0.clone();
+        let schema = self.0.schema.clone();
+        let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
+        let tx = builder.tx();
+        builder.spawn_blocking(move || {
+            let reader = config.reader()?;
+            for b in reader {
+                if tx.blocking_send(b.map_err(Into::into)).is_err() {
+                    break;
+                }
+            }
+            Ok(())
+        });
+        builder.build()
+    }
+}
+
+#[derive(Debug)]
+struct StreamWrite(Arc<StreamConfig>);
+
+impl DisplayAs for StreamWrite {
+    fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
+        write!(f, "{self:?}")
+    }
+}
+
+#[async_trait]
+impl DataSink for StreamWrite {
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        None
+    }
+
+    async fn write_all(
+        &self,
+        mut data: SendableRecordBatchStream,
+        _context: &Arc<TaskContext>,
+    ) -> Result<u64> {
+        let config = self.0.clone();
+        let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
+        // Note: FIFO Files support poll so this could use AsyncFd
+        let write = AbortOnDropSingle::new(spawn_blocking(move || {
+            let mut count = 0_u64;
+            let mut writer = config.writer()?;
+            while let Some(batch) = receiver.blocking_recv() {
+                count += batch.num_rows() as u64;
+                writer.write(&batch)?;
+            }
+            Ok(count)
+        }));
+
+        while let Some(b) = data.next().await.transpose()? {
+            if sender.send(b).await.is_err() {
+                break;
+            }
+        }
+        drop(sender);
+        write.await.unwrap()
+    }
+}
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index 5c79c407b7..b8e111d361 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -27,7 +27,6 @@ use crate::{
     catalog::{CatalogList, MemoryCatalogList},
     datasource::{
         listing::{ListingOptions, ListingTable},
-        listing_table_factory::ListingTableFactory,
         provider::TableProviderFactory,
     },
     datasource::{MemTable, ViewTable},
@@ -111,6 +110,7 @@ use datafusion_sql::planner::object_name_to_table_reference;
 use uuid::Uuid;
 
 // backwards compatibility
+use crate::datasource::provider::DefaultTableFactory;
 use crate::execution::options::ArrowReadOptions;
 pub use datafusion_execution::config::SessionConfig;
 pub use datafusion_execution::TaskContext;
@@ -1285,12 +1285,12 @@ impl SessionState {
         let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
             HashMap::new();
         #[cfg(feature = "parquet")]
-        table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new()));
-        table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new()));
-        table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
-        table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
-        table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
-        table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new()));
+        table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
+        table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
+        table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
+        table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
+        table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
+        table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
 
         if config.create_default_catalog_and_schema() {
             let default_catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index 7d9ea97f7b..93c7f73680 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -17,42 +17,48 @@
 
 //! This test demonstrates the DataFusion FIFO capabilities.
 //!
-#[cfg(not(target_os = "windows"))]
+#[cfg(target_family = "unix")]
 #[cfg(test)]
 mod unix_test {
-    use arrow::array::Array;
-    use arrow::csv::ReaderBuilder;
-    use arrow::datatypes::{DataType, Field, Schema};
-    use datafusion::test_util::register_unbounded_file_with_ordering;
-    use datafusion::{
-        prelude::{CsvReadOptions, SessionConfig, SessionContext},
-        test_util::{aggr_test_schema, arrow_test_data},
-    };
-    use datafusion_common::{exec_err, DataFusionError, Result};
-    use futures::StreamExt;
-    use itertools::enumerate;
-    use nix::sys::stat;
-    use nix::unistd;
-    use rstest::*;
     use std::fs::{File, OpenOptions};
     use std::io::Write;
     use std::path::PathBuf;
     use std::sync::atomic::{AtomicBool, Ordering};
     use std::sync::Arc;
     use std::thread;
-    use std::thread::JoinHandle;
     use std::time::{Duration, Instant};
+
+    use arrow::array::Array;
+    use arrow::csv::ReaderBuilder;
+    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow_schema::SchemaRef;
+    use futures::StreamExt;
+    use nix::sys::stat;
+    use nix::unistd;
     use tempfile::TempDir;
+    use tokio::task::{spawn_blocking, JoinHandle};
 
-    // !  For the sake of the test, do not alter the numbers. !
-    // Session batch size
-    const TEST_BATCH_SIZE: usize = 20;
-    // Number of lines written to FIFO
-    const TEST_DATA_SIZE: usize = 20_000;
-    // Number of lines what can be joined. Each joinable key produced 20 lines with
-    // aggregate_test_100 dataset. We will use these joinable keys for understanding
-    // incremental execution.
-    const TEST_JOIN_RATIO: f64 = 0.01;
+    use datafusion::datasource::stream::{StreamConfig, StreamTable};
+    use datafusion::datasource::TableProvider;
+    use datafusion::{
+        prelude::{CsvReadOptions, SessionConfig, SessionContext},
+        test_util::{aggr_test_schema, arrow_test_data},
+    };
+    use datafusion_common::{exec_err, DataFusionError, Result};
+    use datafusion_expr::Expr;
+
+    /// Makes a TableProvider for a fifo file
+    fn fifo_table(
+        schema: SchemaRef,
+        path: impl Into<PathBuf>,
+        sort: Vec<Vec<Expr>>,
+    ) -> Arc<dyn TableProvider> {
+        let config = StreamConfig::new_file(schema, path.into())
+            .with_order(sort)
+            .with_batch_size(TEST_BATCH_SIZE)
+            .with_header(true);
+        Arc::new(StreamTable::new(Arc::new(config)))
+    }
 
     fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
         let file_path = tmp_dir.path().join(file_name);
@@ -86,14 +92,46 @@ mod unix_test {
         Ok(())
     }
 
+    fn create_writing_thread(
+        file_path: PathBuf,
+        header: String,
+        lines: Vec<String>,
+        waiting_lock: Arc<AtomicBool>,
+        wait_until: usize,
+    ) -> JoinHandle<()> {
+        // Timeout for a long period of BrokenPipe error
+        let broken_pipe_timeout = Duration::from_secs(10);
+        let sa = file_path.clone();
+        // Spawn a new thread to write to the FIFO file
+        spawn_blocking(move || {
+            let file = OpenOptions::new().write(true).open(sa).unwrap();
+            // Reference time to use when deciding to fail the test
+            let execution_start = Instant::now();
+            write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
+            for (cnt, line) in lines.iter().enumerate() {
+                while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+                    thread::sleep(Duration::from_millis(50));
+                }
+                write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
+            }
+            drop(file);
+        })
+    }
+
+    // !  For the sake of the test, do not alter the numbers. !
+    // Session batch size
+    const TEST_BATCH_SIZE: usize = 20;
+    // Number of lines written to FIFO
+    const TEST_DATA_SIZE: usize = 20_000;
+    // Number of lines what can be joined. Each joinable key produced 20 lines with
+    // aggregate_test_100 dataset. We will use these joinable keys for understanding
+    // incremental execution.
+    const TEST_JOIN_RATIO: f64 = 0.01;
+
     // This test provides a relatively realistic end-to-end scenario where
     // we swap join sides to accommodate a FIFO source.
-    #[rstest]
-    #[timeout(std::time::Duration::from_secs(30))]
     #[tokio::test(flavor = "multi_thread", worker_threads = 8)]
-    async fn unbounded_file_with_swapped_join(
-        #[values(true, false)] unbounded_file: bool,
-    ) -> Result<()> {
+    async fn unbounded_file_with_swapped_join() -> Result<()> {
         // Create session context
         let config = SessionConfig::new()
             .with_batch_size(TEST_BATCH_SIZE)
@@ -101,11 +139,10 @@ mod unix_test {
             .with_target_partitions(1);
         let ctx = SessionContext::new_with_config(config);
         // To make unbounded deterministic
-        let waiting = Arc::new(AtomicBool::new(unbounded_file));
+        let waiting = Arc::new(AtomicBool::new(true));
         // Create a new temporary FIFO file
         let tmp_dir = TempDir::new()?;
-        let fifo_path =
-            create_fifo_file(&tmp_dir, &format!("fifo_{unbounded_file:?}.csv"))?;
+        let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
         // Execution can calculated at least one RecordBatch after the number of
         // "joinable_lines_length" lines are read.
         let joinable_lines_length =
@@ -129,7 +166,7 @@ mod unix_test {
             "a1,a2\n".to_owned(),
             lines,
             waiting.clone(),
-            joinable_lines_length,
+            joinable_lines_length * 2,
         );
 
         // Data Schema
@@ -137,15 +174,10 @@ mod unix_test {
             Field::new("a1", DataType::Utf8, false),
             Field::new("a2", DataType::UInt32, false),
         ]));
-        // Create a file with bounded or unbounded flag.
-        ctx.register_csv(
-            "left",
-            fifo_path.as_os_str().to_str().unwrap(),
-            CsvReadOptions::new()
-                .schema(schema.as_ref())
-                .mark_infinite(unbounded_file),
-        )
-        .await?;
+
+        let provider = fifo_table(schema, fifo_path, vec![]);
+        ctx.register_table("left", provider).unwrap();
+
         // Register right table
         let schema = aggr_test_schema();
         let test_data = arrow_test_data();
@@ -161,7 +193,7 @@ mod unix_test {
         while (stream.next().await).is_some() {
             waiting.store(false, Ordering::SeqCst);
         }
-        task.join().unwrap();
+        task.await.unwrap();
         Ok(())
     }
 
@@ -172,39 +204,10 @@ mod unix_test {
         Equal,
     }
 
-    fn create_writing_thread(
-        file_path: PathBuf,
-        header: String,
-        lines: Vec<String>,
-        waiting_lock: Arc<AtomicBool>,
-        wait_until: usize,
-    ) -> JoinHandle<()> {
-        // Timeout for a long period of BrokenPipe error
-        let broken_pipe_timeout = Duration::from_secs(10);
-        // Spawn a new thread to write to the FIFO file
-        thread::spawn(move || {
-            let file = OpenOptions::new().write(true).open(file_path).unwrap();
-            // Reference time to use when deciding to fail the test
-            let execution_start = Instant::now();
-            write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
-            for (cnt, line) in enumerate(lines) {
-                while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
-                    thread::sleep(Duration::from_millis(50));
-                }
-                write_to_fifo(&file, &line, execution_start, broken_pipe_timeout)
-                    .unwrap();
-            }
-            drop(file);
-        })
-    }
-
     // This test provides a relatively realistic end-to-end scenario where
     // we change the join into a [SymmetricHashJoin] to accommodate two
     // unbounded (FIFO) sources.
-    #[rstest]
-    #[timeout(std::time::Duration::from_secs(30))]
-    #[tokio::test(flavor = "multi_thread")]
-    #[ignore]
+    #[tokio::test]
     async fn unbounded_file_with_symmetric_join() -> Result<()> {
         // Create session context
         let config = SessionConfig::new()
@@ -254,47 +257,30 @@ mod unix_test {
             Field::new("a1", DataType::UInt32, false),
             Field::new("a2", DataType::UInt32, false),
         ]));
+
         // Specify the ordering:
-        let file_sort_order = vec![[datafusion_expr::col("a1")]
-            .into_iter()
-            .map(|e| {
-                let ascending = true;
-                let nulls_first = false;
-                e.sort(ascending, nulls_first)
-            })
-            .collect::<Vec<_>>()];
+        let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
+
         // Set unbounded sorted files read configuration
-        register_unbounded_file_with_ordering(
-            &ctx,
-            schema.clone(),
-            &left_fifo,
-            "left",
-            file_sort_order.clone(),
-            true,
-        )
-        .await?;
-        register_unbounded_file_with_ordering(
-            &ctx,
-            schema,
-            &right_fifo,
-            "right",
-            file_sort_order,
-            true,
-        )
-        .await?;
+        let provider = fifo_table(schema.clone(), left_fifo, order.clone());
+        ctx.register_table("left", provider)?;
+
+        let provider = fifo_table(schema.clone(), right_fifo, order);
+        ctx.register_table("right", provider)?;
+
         // Execute the query, with no matching rows. (since key is modulus 10)
         let df = ctx
             .sql(
                 "SELECT
-                                      t1.a1,
-                                      t1.a2,
-                                      t2.a1,
-                                      t2.a2
-                                    FROM
-                                      left as t1 FULL
-                                      JOIN right as t2 ON t1.a2 = t2.a2
-                                      AND t1.a1 > t2.a1 + 4
-                                      AND t1.a1 < t2.a1 + 9",
+                  t1.a1,
+                  t1.a2,
+                  t2.a1,
+                  t2.a2
+                FROM
+                  left as t1 FULL
+                  JOIN right as t2 ON t1.a2 = t2.a2
+                  AND t1.a1 > t2.a1 + 4
+                  AND t1.a1 < t2.a1 + 9",
             )
             .await?;
         let mut stream = df.execute_stream().await?;
@@ -313,7 +299,8 @@ mod unix_test {
             };
             operations.push(op);
         }
-        tasks.into_iter().for_each(|jh| jh.join().unwrap());
+        futures::future::try_join_all(tasks).await.unwrap();
+
         // The SymmetricHashJoin executor produces FULL join results at every
         // pruning, which happens before it reaches the end of input and more
         // than once. In this test, we feed partially joinable data to both
@@ -368,8 +355,9 @@ mod unix_test {
         // Prevent move
         let (sink_fifo_path_thread, sink_display_fifo_path) =
             (sink_fifo_path.clone(), sink_fifo_path.display());
+
         // Spawn a new thread to read sink EXTERNAL TABLE.
-        tasks.push(thread::spawn(move || {
+        tasks.push(spawn_blocking(move || {
             let file = File::open(sink_fifo_path_thread).unwrap();
             let schema = Arc::new(Schema::new(vec![
                 Field::new("a1", DataType::Utf8, false),
@@ -377,7 +365,6 @@ mod unix_test {
             ]));
 
             let mut reader = ReaderBuilder::new(schema)
-                .with_header(true)
                 .with_batch_size(TEST_BATCH_SIZE)
                 .build(file)
                 .map_err(|e| DataFusionError::Internal(e.to_string()))
@@ -389,38 +376,35 @@ mod unix_test {
         }));
         // register second csv file with the SQL (create an empty file if not found)
         ctx.sql(&format!(
-            "CREATE EXTERNAL TABLE source_table (
+            "CREATE UNBOUNDED EXTERNAL TABLE source_table (
                 a1  VARCHAR NOT NULL,
                 a2  INT NOT NULL
             )
             STORED AS CSV
             WITH HEADER ROW
-            OPTIONS ('UNBOUNDED' 'TRUE')
             LOCATION '{source_display_fifo_path}'"
         ))
         .await?;
 
         // register csv file with the SQL
         ctx.sql(&format!(
-            "CREATE EXTERNAL TABLE sink_table (
+            "CREATE UNBOUNDED EXTERNAL TABLE sink_table (
                 a1  VARCHAR NOT NULL,
                 a2  INT NOT NULL
             )
             STORED AS CSV
             WITH HEADER ROW
-            OPTIONS ('UNBOUNDED' 'TRUE')
             LOCATION '{sink_display_fifo_path}'"
         ))
         .await?;
 
         let df = ctx
-            .sql(
-                "INSERT INTO sink_table
-            SELECT a1, a2 FROM source_table",
-            )
+            .sql("INSERT INTO sink_table SELECT a1, a2 FROM source_table")
             .await?;
+
+        // Start execution
         df.collect().await?;
-        tasks.into_iter().for_each(|jh| jh.join().unwrap());
+        futures::future::try_join_all(tasks).await.unwrap();
         Ok(())
     }
 }
diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs
index 1923a5f3ab..b0eaa2b42f 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -55,7 +55,7 @@ pub struct StreamingTableExec {
     partitions: Vec<Arc<dyn PartitionStream>>,
     projection: Option<Arc<[usize]>>,
     projected_schema: SchemaRef,
-    projected_output_ordering: Option<LexOrdering>,
+    projected_output_ordering: Vec<LexOrdering>,
     infinite: bool,
 }
 
@@ -65,7 +65,7 @@ impl StreamingTableExec {
         schema: SchemaRef,
         partitions: Vec<Arc<dyn PartitionStream>>,
         projection: Option<&Vec<usize>>,
-        projected_output_ordering: Option<LexOrdering>,
+        projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
         infinite: bool,
     ) -> Result<Self> {
         for x in partitions.iter() {
@@ -88,7 +88,7 @@ impl StreamingTableExec {
             partitions,
             projected_schema,
             projection: projection.cloned().map(Into::into),
-            projected_output_ordering,
+            projected_output_ordering: projected_output_ordering.into_iter().collect(),
             infinite,
         })
     }
@@ -125,7 +125,7 @@ impl DisplayAs for StreamingTableExec {
                 }
 
                 self.projected_output_ordering
-                    .as_deref()
+                    .first()
                     .map_or(Ok(()), |ordering| {
                         if !ordering.is_empty() {
                             write!(
@@ -160,15 +160,16 @@ impl ExecutionPlan for StreamingTableExec {
     }
 
     fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.projected_output_ordering.as_deref()
+        self.projected_output_ordering
+            .first()
+            .map(|ordering| ordering.as_slice())
     }
 
     fn equivalence_properties(&self) -> EquivalenceProperties {
-        let mut result = EquivalenceProperties::new(self.schema());
-        if let Some(ordering) = &self.projected_output_ordering {
-            result.add_new_orderings([ordering.clone()])
-        }
-        result
+        EquivalenceProperties::new_with_orderings(
+            self.schema(),
+            &self.projected_output_ordering,
+        )
     }
 
     fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt
index ed4f4b4a11..682972b557 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -750,7 +750,7 @@ query TT
 explain select c1 from t;
 ----
 logical_plan TableScan: t projection=[c1]
-physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true
+physical_plan StreamingTableExec: partition_sizes=1, projection=[c1], infinite_source=true
 
 statement ok
 drop table t;
diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt
index 300e92a735..4438d69af3 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2115,7 +2115,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotate
 physical_plan
 ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
 --AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 
 query III
@@ -2146,7 +2146,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotate
 physical_plan
 ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
 --AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1])
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]
 
 query III
 SELECT a, d,
@@ -2179,7 +2179,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
 --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 query III
 SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
@@ -2205,7 +2205,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
 --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 query III
 SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
@@ -2232,7 +2232,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
 physical_plan
 ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c)@2 as last_c]
 --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 query III
 SELECT a, b, LAST_VALUE(c) as last_c
diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt
index 8be02b846c..a3c57a67a6 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2812,7 +2812,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2
 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), e [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NUL [...]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
 query IIII
@@ -2858,7 +2858,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2
 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), e [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NUL [...]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
 query IIII
@@ -2962,7 +2962,7 @@ ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, SUM(annotated_data_infinite2
 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AN [...]
 --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict [...]
 ----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
-------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 
 query IIIIIIIIIIIIIII
@@ -3104,7 +3104,7 @@ CoalesceBatchesExec: target_batch_size=4096
 ----GlobalLimitExec: skip=0, fetch=5
 ------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
 --------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
 
 # this is a negative test for asserting that window functions (other than ROW_NUMBER)
 # are not added to ordering equivalence
@@ -3217,7 +3217,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da
 ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable:  [...]
 --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable [...]
 ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullab [...]
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 statement ok
 set datafusion.execution.target_partitions = 2;
@@ -3255,7 +3255,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da
 ------------------------CoalesceBatchesExec: target_batch_size=4096
 --------------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST
 ----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
 
 # reset the partition number 1 again
 statement ok
@@ -3521,4 +3521,4 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
 ------CoalesceBatchesExec: target_batch_size=4096
 --------SortPreservingRepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
 ----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], has_header=true
+------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]