You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/11/15 12:48:33 UTC
(arrow-datafusion) branch main updated: Implement StreamTable and StreamTableProvider (#7994) (#8021)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 020b8fc761 Implement StreamTable and StreamTableProvider (#7994) (#8021)
020b8fc761 is described below
commit 020b8fc7619cfa392638da76456331b034479874
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Wed Nov 15 12:48:27 2023 +0000
Implement StreamTable and StreamTableProvider (#7994) (#8021)
* Implement FIFO using extension points (#7994)
* Clippy
* Rename to StreamTable and make public
* Add StreamEncoding
* Rework sort order
* Fix logical conflicts
* Format
* Add DefaultTableProvider
* Fix doc
* Fix project sort keys and CSV headers
* Respect batch size on read
* Tests are updated
* Resolving clippy
---------
Co-authored-by: metesynnada <10...@users.noreply.github.com>
---
datafusion/core/src/datasource/listing/table.rs | 37 +--
.../core/src/datasource/listing_table_factory.rs | 9 +-
datafusion/core/src/datasource/mod.rs | 44 +++
datafusion/core/src/datasource/provider.rs | 40 +++
datafusion/core/src/datasource/stream.rs | 326 +++++++++++++++++++++
datafusion/core/src/execution/context/mod.rs | 14 +-
datafusion/core/tests/fifo.rs | 226 +++++++-------
datafusion/physical-plan/src/streaming.rs | 21 +-
datafusion/sqllogictest/test_files/ddl.slt | 2 +-
datafusion/sqllogictest/test_files/groupby.slt | 10 +-
datafusion/sqllogictest/test_files/window.slt | 14 +-
11 files changed, 553 insertions(+), 190 deletions(-)
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index d26d417bd8..c22eb58e88 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -26,6 +26,7 @@ use super::PartitionedFile;
#[cfg(feature = "parquet")]
use crate::datasource::file_format::parquet::ParquetFormat;
use crate::datasource::{
+ create_ordering,
file_format::{
arrow::ArrowFormat,
avro::AvroFormat,
@@ -40,7 +41,6 @@ use crate::datasource::{
TableProvider, TableType,
};
use crate::logical_expr::TableProviderFilterPushDown;
-use crate::physical_plan;
use crate::{
error::{DataFusionError, Result},
execution::context::SessionState,
@@ -48,7 +48,6 @@ use crate::{
physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics},
};
-use arrow::compute::SortOptions;
use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef};
use arrow_schema::Schema;
use datafusion_common::{
@@ -57,10 +56,9 @@ use datafusion_common::{
};
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
-use datafusion_expr::expr::Sort;
use datafusion_optimizer::utils::conjunction;
use datafusion_physical_expr::{
- create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement,
+ create_physical_expr, LexOrdering, PhysicalSortRequirement,
};
use async_trait::async_trait;
@@ -677,34 +675,7 @@ impl ListingTable {
/// If file_sort_order is specified, creates the appropriate physical expressions
fn try_create_output_ordering(&self) -> Result<Vec<LexOrdering>> {
- let mut all_sort_orders = vec![];
-
- for exprs in &self.options.file_sort_order {
- // Construct PhsyicalSortExpr objects from Expr objects:
- let sort_exprs = exprs
- .iter()
- .map(|expr| {
- if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr {
- if let Expr::Column(col) = expr.as_ref() {
- let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?;
- Ok(PhysicalSortExpr {
- expr,
- options: SortOptions {
- descending: !asc,
- nulls_first: *nulls_first,
- },
- })
- } else {
- plan_err!("Expected single column references in output_ordering, got {expr}")
- }
- } else {
- plan_err!("Expected Expr::Sort in output_ordering, but got {expr}")
- }
- })
- .collect::<Result<Vec<_>>>()?;
- all_sort_orders.push(sort_exprs);
- }
- Ok(all_sort_orders)
+ create_ordering(&self.table_schema, &self.options.file_sort_order)
}
}
@@ -1040,9 +1011,11 @@ mod tests {
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
+ use arrow_schema::SortOptions;
use datafusion_common::stats::Precision;
use datafusion_common::{assert_contains, GetExt, ScalarValue};
use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator};
+ use datafusion_physical_expr::PhysicalSortExpr;
use rstest::*;
use tempfile::TempDir;
diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs
index 26f4051897..f9a7ab04ce 100644
--- a/datafusion/core/src/datasource/listing_table_factory.rs
+++ b/datafusion/core/src/datasource/listing_table_factory.rs
@@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable;
use async_trait::async_trait;
/// A `TableProviderFactory` capable of creating new `ListingTable`s
+#[derive(Debug, Default)]
pub struct ListingTableFactory {}
impl ListingTableFactory {
/// Creates a new `ListingTableFactory`
pub fn new() -> Self {
- Self {}
- }
-}
-
-impl Default for ListingTableFactory {
- fn default() -> Self {
- Self::new()
+ Self::default()
}
}
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 48e9d69921..45f9bee6a5 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -29,6 +29,7 @@ pub mod memory;
pub mod physical_plan;
pub mod provider;
mod statistics;
+pub mod stream;
pub mod streaming;
pub mod view;
@@ -43,3 +44,46 @@ pub use self::provider::TableProvider;
pub use self::view::ViewTable;
pub use crate::logical_expr::TableType;
pub use statistics::get_statistics_with_limit;
+
+use arrow_schema::{Schema, SortOptions};
+use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_expr::Expr;
+use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr};
+
+fn create_ordering(
+ schema: &Schema,
+ sort_order: &[Vec<Expr>],
+) -> Result<Vec<LexOrdering>> {
+ let mut all_sort_orders = vec![];
+
+ for exprs in sort_order {
+ // Construct PhysicalSortExpr objects from Expr objects:
+ let mut sort_exprs = vec![];
+ for expr in exprs {
+ match expr {
+ Expr::Sort(sort) => match sort.expr.as_ref() {
+ Expr::Column(col) => match expressions::col(&col.name, schema) {
+ Ok(expr) => {
+ sort_exprs.push(PhysicalSortExpr {
+ expr,
+ options: SortOptions {
+ descending: !sort.asc,
+ nulls_first: sort.nulls_first,
+ },
+ });
+ }
+ // Cannot find expression in the projected_schema, stop iterating
+ // since rest of the orderings are violated
+ Err(_) => break,
+ }
+ expr => return plan_err!("Expected single column references in output_ordering, got {expr}"),
+ }
+ expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"),
+ }
+ }
+ if !sort_exprs.is_empty() {
+ all_sort_orders.push(sort_exprs);
+ }
+ }
+ Ok(all_sort_orders)
+}
diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs
index 7d9f9e86d6..4fe433044e 100644
--- a/datafusion/core/src/datasource/provider.rs
+++ b/datafusion/core/src/datasource/provider.rs
@@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan};
pub use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
+use crate::datasource::listing_table_factory::ListingTableFactory;
+use crate::datasource::stream::StreamTableFactory;
use crate::error::Result;
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
@@ -214,3 +216,41 @@ pub trait TableProviderFactory: Sync + Send {
cmd: &CreateExternalTable,
) -> Result<Arc<dyn TableProvider>>;
}
+
+/// The default [`TableProviderFactory`]
+///
+/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`],
+/// otherwise calls [`ListingTableFactory::create`]
+#[derive(Debug, Default)]
+pub struct DefaultTableFactory {
+ stream: StreamTableFactory,
+ listing: ListingTableFactory,
+}
+
+impl DefaultTableFactory {
+ /// Creates a new [`DefaultTableFactory`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+}
+
+#[async_trait]
+impl TableProviderFactory for DefaultTableFactory {
+ async fn create(
+ &self,
+ state: &SessionState,
+ cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let mut unbounded = cmd.unbounded;
+ for (k, v) in &cmd.options {
+ if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") {
+ unbounded = true
+ }
+ }
+
+ match unbounded {
+ true => self.stream.create(state, cmd).await,
+ false => self.listing.create(state, cmd).await,
+ }
+ }
+}
diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs
new file mode 100644
index 0000000000..cf95dd249a
--- /dev/null
+++ b/datafusion/core/src/datasource/stream.rs
@@ -0,0 +1,326 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! TableProvider for stream sources, such as FIFO files
+
+use std::any::Any;
+use std::fmt::Formatter;
+use std::fs::{File, OpenOptions};
+use std::io::BufReader;
+use std::path::PathBuf;
+use std::str::FromStr;
+use std::sync::Arc;
+
+use arrow_array::{RecordBatch, RecordBatchReader, RecordBatchWriter};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use futures::StreamExt;
+use tokio::task::spawn_blocking;
+
+use datafusion_common::{plan_err, DataFusionError, Result};
+use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+use datafusion_expr::{CreateExternalTable, Expr, TableType};
+use datafusion_physical_plan::common::AbortOnDropSingle;
+use datafusion_physical_plan::insert::{DataSink, FileSinkExec};
+use datafusion_physical_plan::metrics::MetricsSet;
+use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder;
+use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
+use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
+
+use crate::datasource::provider::TableProviderFactory;
+use crate::datasource::{create_ordering, TableProvider};
+use crate::execution::context::SessionState;
+
+/// A [`TableProviderFactory`] for [`StreamTable`]
+#[derive(Debug, Default)]
+pub struct StreamTableFactory {}
+
+#[async_trait]
+impl TableProviderFactory for StreamTableFactory {
+ async fn create(
+ &self,
+ state: &SessionState,
+ cmd: &CreateExternalTable,
+ ) -> Result<Arc<dyn TableProvider>> {
+ let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into());
+ let location = cmd.location.clone();
+ let encoding = cmd.file_type.parse()?;
+
+ let config = StreamConfig::new_file(schema, location.into())
+ .with_encoding(encoding)
+ .with_order(cmd.order_exprs.clone())
+ .with_header(cmd.has_header)
+ .with_batch_size(state.config().batch_size());
+
+ Ok(Arc::new(StreamTable(Arc::new(config))))
+ }
+}
+
+/// The data encoding for [`StreamTable`]
+#[derive(Debug, Clone)]
+pub enum StreamEncoding {
+ /// CSV records
+ Csv,
+ /// Newline-delimited JSON records
+ Json,
+}
+
+impl FromStr for StreamEncoding {
+ type Err = DataFusionError;
+
+ fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
+ match s.to_ascii_lowercase().as_str() {
+ "csv" => Ok(Self::Csv),
+ "json" => Ok(Self::Json),
+ _ => plan_err!("Unrecognised StreamEncoding {}", s),
+ }
+ }
+}
+
+/// The configuration for a [`StreamTable`]
+#[derive(Debug)]
+pub struct StreamConfig {
+ schema: SchemaRef,
+ location: PathBuf,
+ batch_size: usize,
+ encoding: StreamEncoding,
+ header: bool,
+ order: Vec<Vec<Expr>>,
+}
+
+impl StreamConfig {
+ /// Stream data from the file at `location`
+ pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self {
+ Self {
+ schema,
+ location,
+ batch_size: 1024,
+ encoding: StreamEncoding::Csv,
+ order: vec![],
+ header: false,
+ }
+ }
+
+ /// Specify a sort order for the stream
+ pub fn with_order(mut self, order: Vec<Vec<Expr>>) -> Self {
+ self.order = order;
+ self
+ }
+
+ /// Specify the batch size
+ pub fn with_batch_size(mut self, batch_size: usize) -> Self {
+ self.batch_size = batch_size;
+ self
+ }
+
+ /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`])
+ pub fn with_header(mut self, header: bool) -> Self {
+ self.header = header;
+ self
+ }
+
+ /// Specify an encoding for the stream
+ pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self {
+ self.encoding = encoding;
+ self
+ }
+
+ fn reader(&self) -> Result<Box<dyn RecordBatchReader>> {
+ let file = File::open(&self.location)?;
+ let schema = self.schema.clone();
+ match &self.encoding {
+ StreamEncoding::Csv => {
+ let reader = arrow::csv::ReaderBuilder::new(schema)
+ .with_header(self.header)
+ .with_batch_size(self.batch_size)
+ .build(file)?;
+
+ Ok(Box::new(reader))
+ }
+ StreamEncoding::Json => {
+ let reader = arrow::json::ReaderBuilder::new(schema)
+ .with_batch_size(self.batch_size)
+ .build(BufReader::new(file))?;
+
+ Ok(Box::new(reader))
+ }
+ }
+ }
+
+ fn writer(&self) -> Result<Box<dyn RecordBatchWriter>> {
+ match &self.encoding {
+ StreamEncoding::Csv => {
+ let header = self.header && !self.location.exists();
+ let file = OpenOptions::new().write(true).open(&self.location)?;
+ let writer = arrow::csv::WriterBuilder::new()
+ .with_header(header)
+ .build(file);
+
+ Ok(Box::new(writer))
+ }
+ StreamEncoding::Json => {
+ let file = OpenOptions::new().write(true).open(&self.location)?;
+ Ok(Box::new(arrow::json::LineDelimitedWriter::new(file)))
+ }
+ }
+ }
+}
+
+/// A [`TableProvider`] for a stream source, such as a FIFO file
+pub struct StreamTable(Arc<StreamConfig>);
+
+impl StreamTable {
+ /// Create a new [`StreamTable`] for the given `StreamConfig`
+ pub fn new(config: Arc<StreamConfig>) -> Self {
+ Self(config)
+ }
+}
+
+#[async_trait]
+impl TableProvider for StreamTable {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.0.schema.clone()
+ }
+
+ fn table_type(&self) -> TableType {
+ TableType::Base
+ }
+
+ async fn scan(
+ &self,
+ _state: &SessionState,
+ projection: Option<&Vec<usize>>,
+ _filters: &[Expr],
+ _limit: Option<usize>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let projected_schema = match projection {
+ Some(p) => {
+ let projected = self.0.schema.project(p)?;
+ create_ordering(&projected, &self.0.order)?
+ }
+ None => create_ordering(self.0.schema.as_ref(), &self.0.order)?,
+ };
+
+ Ok(Arc::new(StreamingTableExec::try_new(
+ self.0.schema.clone(),
+ vec![Arc::new(StreamRead(self.0.clone())) as _],
+ projection,
+ projected_schema,
+ true,
+ )?))
+ }
+
+ async fn insert_into(
+ &self,
+ _state: &SessionState,
+ input: Arc<dyn ExecutionPlan>,
+ _overwrite: bool,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let ordering = match self.0.order.first() {
+ Some(x) => {
+ let schema = self.0.schema.as_ref();
+ let orders = create_ordering(schema, std::slice::from_ref(x))?;
+ let ordering = orders.into_iter().next().unwrap();
+ Some(ordering.into_iter().map(Into::into).collect())
+ }
+ None => None,
+ };
+
+ Ok(Arc::new(FileSinkExec::new(
+ input,
+ Arc::new(StreamWrite(self.0.clone())),
+ self.0.schema.clone(),
+ ordering,
+ )))
+ }
+}
+
+struct StreamRead(Arc<StreamConfig>);
+
+impl PartitionStream for StreamRead {
+ fn schema(&self) -> &SchemaRef {
+ &self.0.schema
+ }
+
+ fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
+ let config = self.0.clone();
+ let schema = self.0.schema.clone();
+ let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2);
+ let tx = builder.tx();
+ builder.spawn_blocking(move || {
+ let reader = config.reader()?;
+ for b in reader {
+ if tx.blocking_send(b.map_err(Into::into)).is_err() {
+ break;
+ }
+ }
+ Ok(())
+ });
+ builder.build()
+ }
+}
+
+#[derive(Debug)]
+struct StreamWrite(Arc<StreamConfig>);
+
+impl DisplayAs for StreamWrite {
+ fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
+ write!(f, "{self:?}")
+ }
+}
+
+#[async_trait]
+impl DataSink for StreamWrite {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn metrics(&self) -> Option<MetricsSet> {
+ None
+ }
+
+ async fn write_all(
+ &self,
+ mut data: SendableRecordBatchStream,
+ _context: &Arc<TaskContext>,
+ ) -> Result<u64> {
+ let config = self.0.clone();
+ let (sender, mut receiver) = tokio::sync::mpsc::channel::<RecordBatch>(2);
+ // Note: FIFO Files support poll so this could use AsyncFd
+ let write = AbortOnDropSingle::new(spawn_blocking(move || {
+ let mut count = 0_u64;
+ let mut writer = config.writer()?;
+ while let Some(batch) = receiver.blocking_recv() {
+ count += batch.num_rows() as u64;
+ writer.write(&batch)?;
+ }
+ Ok(count)
+ }));
+
+ while let Some(b) = data.next().await.transpose()? {
+ if sender.send(b).await.is_err() {
+ break;
+ }
+ }
+ drop(sender);
+ write.await.unwrap()
+ }
+}
diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs
index 5c79c407b7..b8e111d361 100644
--- a/datafusion/core/src/execution/context/mod.rs
+++ b/datafusion/core/src/execution/context/mod.rs
@@ -27,7 +27,6 @@ use crate::{
catalog::{CatalogList, MemoryCatalogList},
datasource::{
listing::{ListingOptions, ListingTable},
- listing_table_factory::ListingTableFactory,
provider::TableProviderFactory,
},
datasource::{MemTable, ViewTable},
@@ -111,6 +110,7 @@ use datafusion_sql::planner::object_name_to_table_reference;
use uuid::Uuid;
// backwards compatibility
+use crate::datasource::provider::DefaultTableFactory;
use crate::execution::options::ArrowReadOptions;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
@@ -1285,12 +1285,12 @@ impl SessionState {
let mut table_factories: HashMap<String, Arc<dyn TableProviderFactory>> =
HashMap::new();
#[cfg(feature = "parquet")]
- table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new()));
- table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new()));
- table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new()));
- table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new()));
- table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new()));
- table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new()));
+ table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new()));
+ table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new()));
if config.create_default_catalog_and_schema() {
let default_catalog = MemoryCatalogProvider::new();
diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs
index 7d9ea97f7b..93c7f73680 100644
--- a/datafusion/core/tests/fifo.rs
+++ b/datafusion/core/tests/fifo.rs
@@ -17,42 +17,48 @@
//! This test demonstrates the DataFusion FIFO capabilities.
//!
-#[cfg(not(target_os = "windows"))]
+#[cfg(target_family = "unix")]
#[cfg(test)]
mod unix_test {
- use arrow::array::Array;
- use arrow::csv::ReaderBuilder;
- use arrow::datatypes::{DataType, Field, Schema};
- use datafusion::test_util::register_unbounded_file_with_ordering;
- use datafusion::{
- prelude::{CsvReadOptions, SessionConfig, SessionContext},
- test_util::{aggr_test_schema, arrow_test_data},
- };
- use datafusion_common::{exec_err, DataFusionError, Result};
- use futures::StreamExt;
- use itertools::enumerate;
- use nix::sys::stat;
- use nix::unistd;
- use rstest::*;
use std::fs::{File, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
- use std::thread::JoinHandle;
use std::time::{Duration, Instant};
+
+ use arrow::array::Array;
+ use arrow::csv::ReaderBuilder;
+ use arrow::datatypes::{DataType, Field, Schema};
+ use arrow_schema::SchemaRef;
+ use futures::StreamExt;
+ use nix::sys::stat;
+ use nix::unistd;
use tempfile::TempDir;
+ use tokio::task::{spawn_blocking, JoinHandle};
- // ! For the sake of the test, do not alter the numbers. !
- // Session batch size
- const TEST_BATCH_SIZE: usize = 20;
- // Number of lines written to FIFO
- const TEST_DATA_SIZE: usize = 20_000;
- // Number of lines what can be joined. Each joinable key produced 20 lines with
- // aggregate_test_100 dataset. We will use these joinable keys for understanding
- // incremental execution.
- const TEST_JOIN_RATIO: f64 = 0.01;
+ use datafusion::datasource::stream::{StreamConfig, StreamTable};
+ use datafusion::datasource::TableProvider;
+ use datafusion::{
+ prelude::{CsvReadOptions, SessionConfig, SessionContext},
+ test_util::{aggr_test_schema, arrow_test_data},
+ };
+ use datafusion_common::{exec_err, DataFusionError, Result};
+ use datafusion_expr::Expr;
+
+ /// Makes a TableProvider for a fifo file
+ fn fifo_table(
+ schema: SchemaRef,
+ path: impl Into<PathBuf>,
+ sort: Vec<Vec<Expr>>,
+ ) -> Arc<dyn TableProvider> {
+ let config = StreamConfig::new_file(schema, path.into())
+ .with_order(sort)
+ .with_batch_size(TEST_BATCH_SIZE)
+ .with_header(true);
+ Arc::new(StreamTable::new(Arc::new(config)))
+ }
fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result<PathBuf> {
let file_path = tmp_dir.path().join(file_name);
@@ -86,14 +92,46 @@ mod unix_test {
Ok(())
}
+ fn create_writing_thread(
+ file_path: PathBuf,
+ header: String,
+ lines: Vec<String>,
+ waiting_lock: Arc<AtomicBool>,
+ wait_until: usize,
+ ) -> JoinHandle<()> {
+ // Timeout for a long period of BrokenPipe error
+ let broken_pipe_timeout = Duration::from_secs(10);
+ let sa = file_path.clone();
+ // Spawn a new thread to write to the FIFO file
+ spawn_blocking(move || {
+ let file = OpenOptions::new().write(true).open(sa).unwrap();
+ // Reference time to use when deciding to fail the test
+ let execution_start = Instant::now();
+ write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
+ for (cnt, line) in lines.iter().enumerate() {
+ while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
+ thread::sleep(Duration::from_millis(50));
+ }
+ write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap();
+ }
+ drop(file);
+ })
+ }
+
+ // ! For the sake of the test, do not alter the numbers. !
+ // Session batch size
+ const TEST_BATCH_SIZE: usize = 20;
+ // Number of lines written to FIFO
+ const TEST_DATA_SIZE: usize = 20_000;
+ // Number of lines what can be joined. Each joinable key produced 20 lines with
+ // aggregate_test_100 dataset. We will use these joinable keys for understanding
+ // incremental execution.
+ const TEST_JOIN_RATIO: f64 = 0.01;
+
// This test provides a relatively realistic end-to-end scenario where
// we swap join sides to accommodate a FIFO source.
- #[rstest]
- #[timeout(std::time::Duration::from_secs(30))]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
- async fn unbounded_file_with_swapped_join(
- #[values(true, false)] unbounded_file: bool,
- ) -> Result<()> {
+ async fn unbounded_file_with_swapped_join() -> Result<()> {
// Create session context
let config = SessionConfig::new()
.with_batch_size(TEST_BATCH_SIZE)
@@ -101,11 +139,10 @@ mod unix_test {
.with_target_partitions(1);
let ctx = SessionContext::new_with_config(config);
// To make unbounded deterministic
- let waiting = Arc::new(AtomicBool::new(unbounded_file));
+ let waiting = Arc::new(AtomicBool::new(true));
// Create a new temporary FIFO file
let tmp_dir = TempDir::new()?;
- let fifo_path =
- create_fifo_file(&tmp_dir, &format!("fifo_{unbounded_file:?}.csv"))?;
+ let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?;
// Execution can calculated at least one RecordBatch after the number of
// "joinable_lines_length" lines are read.
let joinable_lines_length =
@@ -129,7 +166,7 @@ mod unix_test {
"a1,a2\n".to_owned(),
lines,
waiting.clone(),
- joinable_lines_length,
+ joinable_lines_length * 2,
);
// Data Schema
@@ -137,15 +174,10 @@ mod unix_test {
Field::new("a1", DataType::Utf8, false),
Field::new("a2", DataType::UInt32, false),
]));
- // Create a file with bounded or unbounded flag.
- ctx.register_csv(
- "left",
- fifo_path.as_os_str().to_str().unwrap(),
- CsvReadOptions::new()
- .schema(schema.as_ref())
- .mark_infinite(unbounded_file),
- )
- .await?;
+
+ let provider = fifo_table(schema, fifo_path, vec![]);
+ ctx.register_table("left", provider).unwrap();
+
// Register right table
let schema = aggr_test_schema();
let test_data = arrow_test_data();
@@ -161,7 +193,7 @@ mod unix_test {
while (stream.next().await).is_some() {
waiting.store(false, Ordering::SeqCst);
}
- task.join().unwrap();
+ task.await.unwrap();
Ok(())
}
@@ -172,39 +204,10 @@ mod unix_test {
Equal,
}
- fn create_writing_thread(
- file_path: PathBuf,
- header: String,
- lines: Vec<String>,
- waiting_lock: Arc<AtomicBool>,
- wait_until: usize,
- ) -> JoinHandle<()> {
- // Timeout for a long period of BrokenPipe error
- let broken_pipe_timeout = Duration::from_secs(10);
- // Spawn a new thread to write to the FIFO file
- thread::spawn(move || {
- let file = OpenOptions::new().write(true).open(file_path).unwrap();
- // Reference time to use when deciding to fail the test
- let execution_start = Instant::now();
- write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap();
- for (cnt, line) in enumerate(lines) {
- while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until {
- thread::sleep(Duration::from_millis(50));
- }
- write_to_fifo(&file, &line, execution_start, broken_pipe_timeout)
- .unwrap();
- }
- drop(file);
- })
- }
-
// This test provides a relatively realistic end-to-end scenario where
// we change the join into a [SymmetricHashJoin] to accommodate two
// unbounded (FIFO) sources.
- #[rstest]
- #[timeout(std::time::Duration::from_secs(30))]
- #[tokio::test(flavor = "multi_thread")]
- #[ignore]
+ #[tokio::test]
async fn unbounded_file_with_symmetric_join() -> Result<()> {
// Create session context
let config = SessionConfig::new()
@@ -254,47 +257,30 @@ mod unix_test {
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
]));
+
// Specify the ordering:
- let file_sort_order = vec![[datafusion_expr::col("a1")]
- .into_iter()
- .map(|e| {
- let ascending = true;
- let nulls_first = false;
- e.sort(ascending, nulls_first)
- })
- .collect::<Vec<_>>()];
+ let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]];
+
// Set unbounded sorted files read configuration
- register_unbounded_file_with_ordering(
- &ctx,
- schema.clone(),
- &left_fifo,
- "left",
- file_sort_order.clone(),
- true,
- )
- .await?;
- register_unbounded_file_with_ordering(
- &ctx,
- schema,
- &right_fifo,
- "right",
- file_sort_order,
- true,
- )
- .await?;
+ let provider = fifo_table(schema.clone(), left_fifo, order.clone());
+ ctx.register_table("left", provider)?;
+
+ let provider = fifo_table(schema.clone(), right_fifo, order);
+ ctx.register_table("right", provider)?;
+
// Execute the query, with no matching rows. (since key is modulus 10)
let df = ctx
.sql(
"SELECT
- t1.a1,
- t1.a2,
- t2.a1,
- t2.a2
- FROM
- left as t1 FULL
- JOIN right as t2 ON t1.a2 = t2.a2
- AND t1.a1 > t2.a1 + 4
- AND t1.a1 < t2.a1 + 9",
+ t1.a1,
+ t1.a2,
+ t2.a1,
+ t2.a2
+ FROM
+ left as t1 FULL
+ JOIN right as t2 ON t1.a2 = t2.a2
+ AND t1.a1 > t2.a1 + 4
+ AND t1.a1 < t2.a1 + 9",
)
.await?;
let mut stream = df.execute_stream().await?;
@@ -313,7 +299,8 @@ mod unix_test {
};
operations.push(op);
}
- tasks.into_iter().for_each(|jh| jh.join().unwrap());
+ futures::future::try_join_all(tasks).await.unwrap();
+
// The SymmetricHashJoin executor produces FULL join results at every
// pruning, which happens before it reaches the end of input and more
// than once. In this test, we feed partially joinable data to both
@@ -368,8 +355,9 @@ mod unix_test {
// Prevent move
let (sink_fifo_path_thread, sink_display_fifo_path) =
(sink_fifo_path.clone(), sink_fifo_path.display());
+
// Spawn a new thread to read sink EXTERNAL TABLE.
- tasks.push(thread::spawn(move || {
+ tasks.push(spawn_blocking(move || {
let file = File::open(sink_fifo_path_thread).unwrap();
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::Utf8, false),
@@ -377,7 +365,6 @@ mod unix_test {
]));
let mut reader = ReaderBuilder::new(schema)
- .with_header(true)
.with_batch_size(TEST_BATCH_SIZE)
.build(file)
.map_err(|e| DataFusionError::Internal(e.to_string()))
@@ -389,38 +376,35 @@ mod unix_test {
}));
// register second csv file with the SQL (create an empty file if not found)
ctx.sql(&format!(
- "CREATE EXTERNAL TABLE source_table (
+ "CREATE UNBOUNDED EXTERNAL TABLE source_table (
a1 VARCHAR NOT NULL,
a2 INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
- OPTIONS ('UNBOUNDED' 'TRUE')
LOCATION '{source_display_fifo_path}'"
))
.await?;
// register csv file with the SQL
ctx.sql(&format!(
- "CREATE EXTERNAL TABLE sink_table (
+ "CREATE UNBOUNDED EXTERNAL TABLE sink_table (
a1 VARCHAR NOT NULL,
a2 INT NOT NULL
)
STORED AS CSV
WITH HEADER ROW
- OPTIONS ('UNBOUNDED' 'TRUE')
LOCATION '{sink_display_fifo_path}'"
))
.await?;
let df = ctx
- .sql(
- "INSERT INTO sink_table
- SELECT a1, a2 FROM source_table",
- )
+ .sql("INSERT INTO sink_table SELECT a1, a2 FROM source_table")
.await?;
+
+ // Start execution
df.collect().await?;
- tasks.into_iter().for_each(|jh| jh.join().unwrap());
+ futures::future::try_join_all(tasks).await.unwrap();
Ok(())
}
}
diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs
index 1923a5f3ab..b0eaa2b42f 100644
--- a/datafusion/physical-plan/src/streaming.rs
+++ b/datafusion/physical-plan/src/streaming.rs
@@ -55,7 +55,7 @@ pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<Arc<[usize]>>,
projected_schema: SchemaRef,
- projected_output_ordering: Option<LexOrdering>,
+ projected_output_ordering: Vec<LexOrdering>,
infinite: bool,
}
@@ -65,7 +65,7 @@ impl StreamingTableExec {
schema: SchemaRef,
partitions: Vec<Arc<dyn PartitionStream>>,
projection: Option<&Vec<usize>>,
- projected_output_ordering: Option<LexOrdering>,
+ projected_output_ordering: impl IntoIterator<Item = LexOrdering>,
infinite: bool,
) -> Result<Self> {
for x in partitions.iter() {
@@ -88,7 +88,7 @@ impl StreamingTableExec {
partitions,
projected_schema,
projection: projection.cloned().map(Into::into),
- projected_output_ordering,
+ projected_output_ordering: projected_output_ordering.into_iter().collect(),
infinite,
})
}
@@ -125,7 +125,7 @@ impl DisplayAs for StreamingTableExec {
}
self.projected_output_ordering
- .as_deref()
+ .first()
.map_or(Ok(()), |ordering| {
if !ordering.is_empty() {
write!(
@@ -160,15 +160,16 @@ impl ExecutionPlan for StreamingTableExec {
}
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.projected_output_ordering.as_deref()
+ self.projected_output_ordering
+ .first()
+ .map(|ordering| ordering.as_slice())
}
fn equivalence_properties(&self) -> EquivalenceProperties {
- let mut result = EquivalenceProperties::new(self.schema());
- if let Some(ordering) = &self.projected_output_ordering {
- result.add_new_orderings([ordering.clone()])
- }
- result
+ EquivalenceProperties::new_with_orderings(
+ self.schema(),
+ &self.projected_output_ordering,
+ )
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt
index ed4f4b4a11..682972b557 100644
--- a/datafusion/sqllogictest/test_files/ddl.slt
+++ b/datafusion/sqllogictest/test_files/ddl.slt
@@ -750,7 +750,7 @@ query TT
explain select c1 from t;
----
logical_plan TableScan: t projection=[c1]
-physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true
+physical_plan StreamingTableExec: partition_sizes=1, projection=[c1], infinite_source=true
statement ok
drop table t;
diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt
index 300e92a735..4438d69af3 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -2115,7 +2115,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotate
physical_plan
ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1]
--AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
query III
@@ -2146,7 +2146,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotate
physical_plan
ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1]
--AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1])
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST]
query III
SELECT a, d,
@@ -2179,7 +2179,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE(
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c]
--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
query III
SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c
@@ -2205,7 +2205,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c]
--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
query III
SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c
@@ -2232,7 +2232,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a
physical_plan
ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c)@2 as last_c]
--AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted
-----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
query III
SELECT a, b, LAST_VALUE(c) as last_c
diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt
index 8be02b846c..a3c57a67a6 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2812,7 +2812,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2
----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), e [...]
--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NUL [...]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
@@ -2858,7 +2858,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2
----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(anno [...]
------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), e [...]
--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NUL [...]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
@@ -2962,7 +2962,7 @@ ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, SUM(annotated_data_infinite2
------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AN [...]
--------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict [...]
----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, b@1 as b, c@2 as c, d@3 as d]
-------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
query IIIIIIIIIIIIIII
@@ -3104,7 +3104,7 @@ CoalesceBatchesExec: target_batch_size=4096
----GlobalLimitExec: skip=0, fetch=5
------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1]
--------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted]
-----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true
+----------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST]
# this is a negative test for asserting that window functions (other than ROW_NUMBER)
# are not added to ordering equivalence
@@ -3217,7 +3217,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da
------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: [...]
--------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable [...]
----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullab [...]
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
statement ok
set datafusion.execution.target_partitions = 2;
@@ -3255,7 +3255,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da
------------------------CoalesceBatchesExec: target_batch_size=4096
--------------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST
----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true
+------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST]
# reset the partition number 1 again
statement ok
@@ -3521,4 +3521,4 @@ SortPreservingMergeExec: [c@3 ASC NULLS LAST]
------CoalesceBatchesExec: target_batch_size=4096
--------SortPreservingRepartitionExec: partitioning=Hash([d@4], 2), input_partitions=2, sort_exprs=a@1 ASC NULLS LAST,b@2 ASC NULLS LAST
----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1
-------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST], has_header=true
+------------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST]