You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/19 18:27:58 UTC
[arrow-datafusion] branch main updated: INSERT returns number of rows written, add `InsertExec` to handle common case. (#6354)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b16a4034b3 INSERT returns number of rows written, add `InsertExec` to handle common case. (#6354)
b16a4034b3 is described below
commit b16a4034b323cd50172fc07fc41eca94e8cebe93
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri May 19 14:27:52 2023 -0400
INSERT returns number of rows written, add `InsertExec` to handle common case. (#6354)
* Add InsertExec, port in memory insert to use DataSink
* fix: clippy
* Add Display to Sink and update plans
* Add additional verification that insert made it to the table
* Add test to ensure the sort order is maintained for insert query
* Ensure the sort order is maintained for insert query, test for same
---
datafusion/core/src/datasource/datasource.rs | 21 +-
datafusion/core/src/datasource/memory.rs | 119 ++++-
datafusion/core/src/physical_plan/insert.rs | 217 ++++++++
datafusion/core/src/physical_plan/memory.rs | 549 ---------------------
datafusion/core/src/physical_plan/mod.rs | 1 +
.../core/tests/sqllogictests/test_files/insert.slt | 236 +++++++++
6 files changed, 568 insertions(+), 575 deletions(-)
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 4560b3820c..11f30f33d1 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -98,7 +98,26 @@ pub trait TableProvider: Sync + Send {
None
}
- /// Insert into this table
+ /// Return an [`ExecutionPlan`] to insert data into this table, if
+ /// supported.
+ ///
+ /// The returned plan should return a single row in a UInt64
+ /// column called "count" such as the following
+ ///
+ /// ```text
+ /// +-------+,
+ /// | count |,
+ /// +-------+,
+ /// | 6 |,
+ /// +-------+,
+ /// ```
+ ///
+ /// # See Also
+ ///
+ /// See [`InsertExec`] for the common pattern of inserting a
+ /// single stream of `RecordBatch`es.
+ ///
+ /// [`InsertExec`]: crate::physical_plan::insert::InsertExec
async fn insert_into(
&self,
_state: &SessionState,
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index f41f8cb1bd..0b8afda333 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,6 +19,7 @@
use futures::StreamExt;
use std::any::Any;
+use std::fmt::{self, Debug, Display};
use std::sync::Arc;
use arrow::datatypes::SchemaRef;
@@ -30,11 +31,11 @@ use crate::datasource::{TableProvider, TableType};
use crate::error::{DataFusionError, Result};
use crate::execution::context::SessionState;
use crate::logical_expr::Expr;
-use crate::physical_plan::common;
use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::insert::{DataSink, InsertExec};
use crate::physical_plan::memory::MemoryExec;
-use crate::physical_plan::memory::MemoryWriteExec;
use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{common, SendableRecordBatchStream};
use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
/// Type alias for partition data
@@ -164,7 +165,8 @@ impl TableProvider for MemTable {
)?))
}
- /// Inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
+ /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
+ ///
/// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
///
/// # Arguments
@@ -174,7 +176,7 @@ impl TableProvider for MemTable {
///
/// # Returns
///
- /// * A `Result` indicating success or failure.
+ /// * A plan that returns the number of rows written.
async fn insert_into(
&self,
_state: &SessionState,
@@ -187,27 +189,61 @@ impl TableProvider for MemTable {
"Inserting query must have the same schema with the table.".to_string(),
));
}
+ let sink = Arc::new(MemSink::new(self.batches.clone()));
+ Ok(Arc::new(InsertExec::new(input, sink)))
+ }
+}
- if self.batches.is_empty() {
- return Err(DataFusionError::Plan(
- "The table must have partitions.".to_string(),
- ));
+/// Implements for writing to a [`MemTable`]
+struct MemSink {
+ /// Target locations for writing data
+ batches: Vec<PartitionData>,
+}
+
+impl Debug for MemSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MemSink")
+ .field("num_partitions", &self.batches.len())
+ .finish()
+ }
+}
+
+impl Display for MemSink {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ let partition_count = self.batches.len();
+ write!(f, "MemoryTable (partitions={partition_count})")
+ }
+}
+
+impl MemSink {
+ fn new(batches: Vec<PartitionData>) -> Self {
+ Self { batches }
+ }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+ async fn write_all(&self, mut data: SendableRecordBatchStream) -> Result<u64> {
+ let num_partitions = self.batches.len();
+
+ // buffer up the data round robin style into num_partitions
+
+ let mut new_batches = vec![vec![]; num_partitions];
+ let mut i = 0;
+ let mut row_count = 0;
+ while let Some(batch) = data.next().await.transpose()? {
+ row_count += batch.num_rows();
+ new_batches[i].push(batch);
+ i = (i + 1) % num_partitions;
}
- let input = if self.batches.len() > 1 {
- Arc::new(RepartitionExec::try_new(
- input,
- Partitioning::RoundRobinBatch(self.batches.len()),
- )?)
- } else {
- input
- };
+ // write the outputs into the batches
+ for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
+ // Append all the new batches in one go to minimize locking overhead
+ target.write().await.append(&mut batches);
+ }
- Ok(Arc::new(MemoryWriteExec::try_new(
- input,
- self.batches.clone(),
- self.schema.clone(),
- )?))
+ Ok(row_count as u64)
}
}
@@ -218,8 +254,8 @@ mod tests {
use crate::from_slice::FromSlice;
use crate::physical_plan::collect;
use crate::prelude::SessionContext;
- use arrow::array::Int32Array;
- use arrow::datatypes::{DataType, Field, Schema};
+ use arrow::array::{AsArray, Int32Array};
+ use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
use arrow::error::ArrowError;
use datafusion_expr::LogicalPlanBuilder;
use futures::StreamExt;
@@ -457,6 +493,11 @@ mod tests {
initial_data: Vec<Vec<RecordBatch>>,
inserted_data: Vec<Vec<RecordBatch>>,
) -> Result<Vec<Vec<RecordBatch>>> {
+ let expected_count: u64 = inserted_data
+ .iter()
+ .flat_map(|batches| batches.iter().map(|batch| batch.num_rows() as u64))
+ .sum();
+
// Create a new session context
let session_ctx = SessionContext::new();
// Create and register the initial table with the provided schema and data
@@ -480,8 +521,8 @@ mod tests {
// Execute the physical plan and collect the results
let res = collect(plan, session_ctx.task_ctx()).await?;
- // Ensure the result is empty after the insert operation
- assert!(res.is_empty());
+ assert_eq!(extract_count(res), expected_count);
+
// Read the data from the initial table and store it in a vector of partitions
let mut partitions = vec![];
for partition in initial_table.batches.iter() {
@@ -491,6 +532,34 @@ mod tests {
Ok(partitions)
}
+ /// Returns the value of results. For example, returns 6 given the follwing
+ ///
+ /// ```text
+ /// +-------+,
+ /// | count |,
+ /// +-------+,
+ /// | 6 |,
+ /// +-------+,
+ /// ```
+ fn extract_count(res: Vec<RecordBatch>) -> u64 {
+ assert_eq!(res.len(), 1, "expected one batch, got {}", res.len());
+ let batch = &res[0];
+ assert_eq!(
+ batch.num_columns(),
+ 1,
+ "expected 1 column, got {}",
+ batch.num_columns()
+ );
+ let col = batch.column(0).as_primitive::<UInt64Type>();
+ assert_eq!(col.len(), 1, "expected 1 row, got {}", col.len());
+ let val = col
+ .iter()
+ .next()
+ .expect("had value")
+ .expect("expected non null");
+ val
+ }
+
// Test inserting a single batch of data into a single partition
#[tokio::test]
async fn test_insert_into_single_partition() -> Result<()> {
diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs
new file mode 100644
index 0000000000..904348c574
--- /dev/null
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing data to [`DataSink`]s
+
+use super::expressions::PhysicalSortExpr;
+use super::{
+ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+};
+use crate::error::Result;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, UInt64Array};
+use arrow_schema::{DataType, Field, Schema};
+use async_trait::async_trait;
+use core::fmt;
+use datafusion_physical_expr::PhysicalSortRequirement;
+use futures::StreamExt;
+use std::any::Any;
+use std::fmt::{Debug, Display};
+use std::sync::Arc;
+
+use crate::execution::context::TaskContext;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::Distribution;
+use datafusion_common::DataFusionError;
+
+/// `DataSink` implements writing streams of [`RecordBatch`]es to
+/// user defined destinations.
+///
+/// The `Display` impl is used to format the sink for explain plan
+/// output.
+#[async_trait]
+pub trait DataSink: Display + Debug + Send + Sync {
+ // TODO add desired input ordering
+ // How does this sink want its input ordered?
+
+ /// Writes the data to the sink, returns the number of values written
+ ///
+ /// This method will be called exactly once during each DML
+ /// statement. Thus prior to return, the sink should do any commit
+ /// or rollback required.
+ async fn write_all(&self, data: SendableRecordBatchStream) -> Result<u64>;
+}
+
+/// Execution plan for writing record batches to a [`DataSink`]
+///
+/// Returns a single row with the number of values written
+pub struct InsertExec {
+ /// Input plan that produces the record batches to be written.
+ input: Arc<dyn ExecutionPlan>,
+ /// Sink to whic to write
+ sink: Arc<dyn DataSink>,
+ /// Schema describing the structure of the data.
+ schema: SchemaRef,
+}
+
+impl fmt::Debug for InsertExec {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ write!(f, "InsertExec schema: {:?}", self.schema)
+ }
+}
+
+impl InsertExec {
+ /// Create a plan to write to `sink`
+ pub fn new(input: Arc<dyn ExecutionPlan>, sink: Arc<dyn DataSink>) -> Self {
+ Self {
+ input,
+ sink,
+ schema: make_count_schema(),
+ }
+ }
+}
+
+impl ExecutionPlan for InsertExec {
+ /// Return a reference to Any that can be used for downcasting
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Get the schema for this execution plan
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn output_partitioning(&self) -> Partitioning {
+ Partitioning::UnknownPartitioning(1)
+ }
+
+ fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+ None
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::SinglePartition]
+ }
+
+ fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
+ // Require that the InsertExec gets the data in the order the
+ // input produced it (otherwise the optimizer may chose to reorder
+ // the input which could result in unintended / poor UX)
+ //
+ // More rationale:
+ // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
+ vec![self
+ .input
+ .output_ordering()
+ .map(PhysicalSortRequirement::from_sort_exprs)]
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+ vec![self.input.clone()]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ Ok(Arc::new(Self {
+ input: children[0].clone(),
+ sink: self.sink.clone(),
+ schema: self.schema.clone(),
+ }))
+ }
+
+ /// Execute the plan and return a stream of `RecordBatch`es for
+ /// the specified partition.
+ fn execute(
+ &self,
+ partition: usize,
+ context: Arc<TaskContext>,
+ ) -> Result<SendableRecordBatchStream> {
+ if partition != 0 {
+ return Err(DataFusionError::Internal(
+ format!("Invalid requested partition {partition}. InsertExec requires a single input partition."
+ )));
+ }
+
+ // Execute each of our own input's partitions and pass them to the sink
+ let input_partition_count = self.input.output_partitioning().partition_count();
+ if input_partition_count != 1 {
+ return Err(DataFusionError::Internal(format!(
+ "Invalid input partition count {input_partition_count}. \
+ InsertExec needs only a single partition."
+ )));
+ }
+
+ let data = self.input.execute(0, context)?;
+ let schema = self.schema.clone();
+ let sink = self.sink.clone();
+
+ let stream = futures::stream::once(async move {
+ sink.write_all(data).await.map(make_count_batch)
+ })
+ .boxed();
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+ }
+
+ fn fmt_as(
+ &self,
+ t: DisplayFormatType,
+ f: &mut std::fmt::Formatter,
+ ) -> std::fmt::Result {
+ match t {
+ DisplayFormatType::Default => {
+ write!(f, "InsertExec: sink={}", self.sink)
+ }
+ }
+ }
+
+ fn statistics(&self) -> Statistics {
+ Statistics::default()
+ }
+}
+
+/// Create a output record batch with a count
+///
+/// ```text
+/// +-------+,
+/// | count |,
+/// +-------+,
+/// | 6 |,
+/// +-------+,
+/// ```
+fn make_count_batch(count: u64) -> RecordBatch {
+ let array = Arc::new(UInt64Array::from_iter_values(vec![count])) as ArrayRef;
+
+ RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
+}
+
+fn make_count_schema() -> SchemaRef {
+ // define a schema.
+ Arc::new(Schema::new(vec![Field::new(
+ "count",
+ DataType::UInt64,
+ false,
+ )]))
+}
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index aae8ffbcb7..79d6f6c1cc 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -26,18 +26,13 @@ use crate::error::Result;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
use core::fmt;
-use futures::StreamExt;
use std::any::Any;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::datasource::memory::PartitionData;
use crate::execution::context::TaskContext;
-use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::Distribution;
use datafusion_common::DataFusionError;
use futures::Stream;
-use tokio::sync::RwLock;
/// Execution plan for reading in-memory batches of data
pub struct MemoryExec {
@@ -243,547 +238,3 @@ impl RecordBatchStream for MemoryStream {
self.schema.clone()
}
}
-
-/// Execution plan for writing record batches to an in-memory table.
-pub struct MemoryWriteExec {
- /// Input plan that produces the record batches to be written.
- input: Arc<dyn ExecutionPlan>,
- /// Reference to the MemTable's partition data.
- batches: Vec<PartitionData>,
- /// Schema describing the structure of the data.
- schema: SchemaRef,
-}
-
-impl fmt::Debug for MemoryWriteExec {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- write!(f, "schema: {:?}", self.schema)
- }
-}
-
-impl ExecutionPlan for MemoryWriteExec {
- /// Return a reference to Any that can be used for downcasting
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- /// Get the schema for this execution plan
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-
- fn output_partitioning(&self) -> Partitioning {
- Partitioning::UnknownPartitioning(
- self.input.output_partitioning().partition_count(),
- )
- }
-
- fn benefits_from_input_partitioning(&self) -> bool {
- false
- }
-
- fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
- self.input.output_ordering()
- }
-
- fn required_input_distribution(&self) -> Vec<Distribution> {
- // If the partition count of the MemTable is one, we want to require SinglePartition
- // since it would induce better plans in plan optimizer.
- if self.batches.len() == 1 {
- vec![Distribution::SinglePartition]
- } else {
- vec![Distribution::UnspecifiedDistribution]
- }
- }
-
- fn maintains_input_order(&self) -> Vec<bool> {
- // In theory, if MemTable partition count equals the input plans output partition count,
- // the Execution plan can preserve the order inside the partitions.
- vec![self.batches.len() == self.input.output_partitioning().partition_count()]
- }
-
- fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
- vec![self.input.clone()]
- }
-
- fn with_new_children(
- self: Arc<Self>,
- children: Vec<Arc<dyn ExecutionPlan>>,
- ) -> Result<Arc<dyn ExecutionPlan>> {
- Ok(Arc::new(MemoryWriteExec::try_new(
- children[0].clone(),
- self.batches.clone(),
- self.schema.clone(),
- )?))
- }
-
- /// Execute the plan and return a stream of record batches for the specified partition.
- /// Depending on the number of input partitions and MemTable partitions, it will choose
- /// either a less lock acquiring or a locked implementation.
- fn execute(
- &self,
- partition: usize,
- context: Arc<TaskContext>,
- ) -> Result<SendableRecordBatchStream> {
- let batch_count = self.batches.len();
- let data = self.input.execute(partition, context)?;
- let schema = self.schema.clone();
- let state = StreamState {
- data,
- buffer: vec![],
- batch: self.batches[partition % batch_count].clone(),
- };
-
- let stream = futures::stream::unfold(state, |mut state| async move {
- loop {
- match state.data.next().await {
- Some(Ok(batch)) => state.buffer.push(batch),
- Some(Err(e)) => return Some((Err(e), state)),
- None => {
- // stream is done, transfer all data to target PartitionData
- state.batch.write().await.append(&mut state.buffer);
- return None;
- }
- };
- }
- });
-
- let adapter = Box::pin(RecordBatchStreamAdapter::new(schema, stream))
- as SendableRecordBatchStream;
-
- Ok(adapter)
- }
-
- fn fmt_as(
- &self,
- t: DisplayFormatType,
- f: &mut std::fmt::Formatter,
- ) -> std::fmt::Result {
- match t {
- DisplayFormatType::Default => {
- write!(
- f,
- "MemoryWriteExec: partitions={}, input_partition={}",
- self.batches.len(),
- self.input.output_partitioning().partition_count()
- )
- }
- }
- }
-
- fn statistics(&self) -> Statistics {
- Statistics::default()
- }
-}
-
-struct StreamState {
- /// Input stream
- data: SendableRecordBatchStream,
- /// Data is buffered here until complete
- buffer: Vec<RecordBatch>,
- /// target
- batch: PartitionData,
-}
-
-impl MemoryWriteExec {
- /// Create a new execution plan for reading in-memory record batches
- /// The provided `schema` should not have the projection applied.
- pub fn try_new(
- plan: Arc<dyn ExecutionPlan>,
- batches: Vec<Arc<RwLock<Vec<RecordBatch>>>>,
- schema: SchemaRef,
- ) -> Result<Self> {
- Ok(Self {
- input: plan,
- batches,
- schema,
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::datasource::streaming::PartitionStream;
- use crate::datasource::{MemTable, TableProvider};
- use crate::from_slice::FromSlice;
- use crate::physical_plan::stream::RecordBatchStreamAdapter;
- use crate::physical_plan::streaming::StreamingTableExec;
- use crate::physical_plan::ColumnStatistics;
- use crate::physical_plan::{collect, displayable, SendableRecordBatchStream};
- use crate::prelude::{CsvReadOptions, SessionContext};
- use crate::test_util;
- use arrow::array::Int32Array;
- use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
- use arrow::record_batch::RecordBatch;
- use datafusion_common::Result;
- use datafusion_execution::config::SessionConfig;
- use datafusion_execution::TaskContext;
- use futures::StreamExt;
- use std::sync::Arc;
-
- fn mock_data() -> Result<(SchemaRef, RecordBatch)> {
- let schema = Arc::new(Schema::new(vec![
- Field::new("a", DataType::Int32, false),
- Field::new("b", DataType::Int32, false),
- Field::new("c", DataType::Int32, true),
- Field::new("d", DataType::Int32, false),
- ]));
-
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![
- Arc::new(Int32Array::from_slice([1, 2, 3])),
- Arc::new(Int32Array::from_slice([4, 5, 6])),
- Arc::new(Int32Array::from(vec![None, None, Some(9)])),
- Arc::new(Int32Array::from_slice([7, 8, 9])),
- ],
- )?;
-
- Ok((schema, batch))
- }
-
- #[tokio::test]
- async fn test_with_projection() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let (schema, batch) = mock_data()?;
-
- let executor = MemoryExec::try_new(&[vec![batch]], schema, Some(vec![2, 1]))?;
- let statistics = executor.statistics();
-
- assert_eq!(statistics.num_rows, Some(3));
- assert_eq!(
- statistics.column_statistics,
- Some(vec![
- ColumnStatistics {
- null_count: Some(2),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ColumnStatistics {
- null_count: Some(0),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ])
- );
-
- // scan with projection
- let mut it = executor.execute(0, task_ctx)?;
- let batch2 = it.next().await.unwrap()?;
- assert_eq!(2, batch2.schema().fields().len());
- assert_eq!("c", batch2.schema().field(0).name());
- assert_eq!("b", batch2.schema().field(1).name());
- assert_eq!(2, batch2.num_columns());
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_without_projection() -> Result<()> {
- let session_ctx = SessionContext::new();
- let task_ctx = session_ctx.task_ctx();
- let (schema, batch) = mock_data()?;
-
- let executor = MemoryExec::try_new(&[vec![batch]], schema, None)?;
- let statistics = executor.statistics();
-
- assert_eq!(statistics.num_rows, Some(3));
- assert_eq!(
- statistics.column_statistics,
- Some(vec![
- ColumnStatistics {
- null_count: Some(0),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ColumnStatistics {
- null_count: Some(0),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ColumnStatistics {
- null_count: Some(2),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ColumnStatistics {
- null_count: Some(0),
- max_value: None,
- min_value: None,
- distinct_count: None,
- },
- ])
- );
-
- let mut it = executor.execute(0, task_ctx)?;
- let batch1 = it.next().await.unwrap()?;
- assert_eq!(4, batch1.schema().fields().len());
- assert_eq!(4, batch1.num_columns());
-
- Ok(())
- }
-
- #[tokio::test]
- async fn test_insert_into() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql(
- "CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
- )
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
- FROM aggregate_test_100
- ORDER by c1
- ";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=1, input_partition=1",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]",
- " SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- #[tokio::test]
- async fn test_insert_into_as_select_multi_partitioned() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql(
- "CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
- )
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=1, input_partition=1",
- " CoalescePartitionsExec",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows [...]
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- // TODO: The generated plan is suboptimal since SortExec is in global state.
- #[tokio::test]
- async fn test_insert_into_as_select_single_partition() -> Result<()> {
- // Create session context
- let config = SessionConfig::new().with_target_partitions(8);
- let ctx = SessionContext::with_config(config);
- let testdata = test_util::arrow_test_data();
- let schema = test_util::aggr_test_schema();
- ctx.register_csv(
- "aggregate_test_100",
- &format!("{testdata}/csv/aggregate_test_100.csv"),
- CsvReadOptions::new().schema(&schema),
- )
- .await?;
- ctx.sql("CREATE TABLE table_without_values AS SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100")
- .await?;
-
- let sql = "INSERT INTO table_without_values SELECT
- SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
- COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
- FROM aggregate_test_100
- ORDER BY c1";
- let msg = format!("Creating logical plan for '{sql}'");
- let dataframe = ctx.sql(sql).await.expect(&msg);
- let physical_plan = dataframe.create_physical_plan().await?;
- let formatted = displayable(physical_plan.as_ref()).indent().to_string();
- let expected = {
- vec![
- "MemoryWriteExec: partitions=8, input_partition=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
- " ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]",
- " SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
- " ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]",
- " BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: [...]
- " SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
- " CoalesceBatchesExec: target_batch_size=8192",
- " RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
- " RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
- ]
- };
-
- let actual: Vec<&str> = formatted.trim().lines().collect();
- let actual_len = actual.len();
- let actual_trim_last = &actual[..actual_len - 1];
- assert_eq!(
- expected, actual_trim_last,
- "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
- );
- Ok(())
- }
-
- // DummyPartition is a simple implementation of the PartitionStream trait.
- // It produces a stream of record batches with a fixed schema and the same content.
- struct DummyPartition {
- schema: SchemaRef,
- batch: RecordBatch,
- num_batches: usize,
- }
-
- impl PartitionStream for DummyPartition {
- // Return a reference to the schema of this partition.
- fn schema(&self) -> &SchemaRef {
- &self.schema
- }
-
- // Execute the partition stream, producing a stream of record batches.
- fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
- let batches = itertools::repeat_n(self.batch.clone(), self.num_batches);
- Box::pin(RecordBatchStreamAdapter::new(
- self.schema.clone(),
- futures::stream::iter(batches).map(Ok),
- ))
- }
- }
-
- // Test the less-lock mode by inserting a large number of batches into a table.
- #[tokio::test]
- async fn test_one_to_one_mode() -> Result<()> {
- let num_batches = 10000;
- // Create a new session context
- let session_ctx = SessionContext::new();
- // Create a new schema with one field called "a" of type Int32
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
-
- // Create a new batch of data to insert into the table
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
- )?;
- let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
-
- let single_partition = Arc::new(DummyPartition {
- schema: schema.clone(),
- batch,
- num_batches,
- });
- let input = Arc::new(StreamingTableExec::try_new(
- schema.clone(),
- vec![single_partition],
- None,
- false,
- )?);
- let plan = initial_table
- .insert_into(&session_ctx.state(), input)
- .await?;
- let res = collect(plan, session_ctx.task_ctx()).await?;
- assert!(res.is_empty());
- // Ensure that the table now contains two batches of data in the same partition
- assert_eq!(initial_table.batches[0].read().await.len(), num_batches);
- Ok(())
- }
-
- // Test the locked mode by inserting a large number of batches into a table. It tests
- // where the table partition count is not equal to the input's output partition count.
- #[tokio::test]
- async fn test_locked_mode() -> Result<()> {
- let num_batches = 10000;
- // Create a new session context
- let session_ctx = SessionContext::new();
- // Create a new schema with one field called "a" of type Int32
- let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
-
- // Create a new batch of data to insert into the table
- let batch = RecordBatch::try_new(
- schema.clone(),
- vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
- )?;
- let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
-
- let single_partition = Arc::new(DummyPartition {
- schema: schema.clone(),
- batch,
- num_batches,
- });
- let input = Arc::new(StreamingTableExec::try_new(
- schema.clone(),
- vec![
- single_partition.clone(),
- single_partition.clone(),
- single_partition,
- ],
- None,
- false,
- )?);
- let plan = initial_table
- .insert_into(&session_ctx.state(), input)
- .await?;
- let res = collect(plan, session_ctx.task_ctx()).await?;
- assert!(res.is_empty());
- // Ensure that the table now contains two batches of data in the same partition
- assert_eq!(initial_table.batches[0].read().await.len(), num_batches * 3);
- Ok(())
- }
-}
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index c8edf701cf..bedbf84c17 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -663,6 +663,7 @@ pub mod empty;
pub mod explain;
pub mod file_format;
pub mod filter;
+pub mod insert;
pub mod joins;
pub mod limit;
pub mod memory;
diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt b/datafusion/core/tests/sqllogictests/test_files/insert.slt
new file mode 100644
index 0000000000..dae5bb94f4
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt
@@ -0,0 +1,236 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## INSERT tests
+##########
+
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+ c1 VARCHAR NOT NULL,
+ c2 TINYINT NOT NULL,
+ c3 SMALLINT NOT NULL,
+ c4 SMALLINT,
+ c5 INT,
+ c6 BIGINT NOT NULL,
+ c7 SMALLINT NOT NULL,
+ c8 INT NOT NULL,
+ c9 BIGINT UNSIGNED NOT NULL,
+ c10 VARCHAR NOT NULL,
+ c11 FLOAT NOT NULL,
+ c12 DOUBLE NOT NULL,
+ c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# test_insert_into
+
+statement ok
+set datafusion.execution.target_partitions = 8;
+
+statement ok
+CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL);
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
+----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Prec [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+100
+
+# verify there is data now in the table
+query I
+SELECT COUNT(*) from table_without_values;
+----
+100
+
+# verify there is data now in the table
+query II
+SELECT *
+FROM table_without_values
+ORDER BY field1, field2
+LIMIT 5;
+----
+-70111 3
+-65362 3
+-62295 3
+-56721 3
+-55414 3
+
+statement ok
+drop table table_without_values;
+
+
+
+# test_insert_into_as_select_multi_partitioned
+statement ok
+CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--CoalescePartitionsExec
+----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preced [...]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+----
+100
+
+statement ok
+drop table table_without_values;
+
+
+# test_insert_into_as_select_single_partition
+
+statement ok
+CREATE TABLE table_without_values AS SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+
+
+# // TODO: The generated plan is suboptimal since SortExec is in global state.
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+ORDER BY c1
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: a1 AS a1, a2 AS a2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=8)
+--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
+----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Prec [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+ORDER BY c1
+----
+100
+
+
+statement ok
+drop table table_without_values;
+
+# test_insert_into_with_sort
+
+statement ok
+create table table_without_values(c1 varchar not null);
+
+# verify that the sort order of the insert query is maintained into the
+# insert (there should be a SortExec in the following plan)
+# See https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 for more background
+query TT
+explain insert into table_without_values select c1 from aggregate_test_100 order by c1;
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: aggregate_test_100.c1 AS c1
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------TableScan: aggregate_test_100 projection=[c1]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--ProjectionExec: expr=[c1@0 as c1]
+----SortExec: expr=[c1@0 ASC NULLS LAST]
+------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
+
+query T
+insert into table_without_values select c1 from aggregate_test_100 order by c1;
+----
+100
+
+query I
+select count(*) from table_without_values;
+----
+100
+
+statement ok
+drop table table_without_values;