You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/19 18:27:58 UTC

[arrow-datafusion] branch main updated: INSERT returns number of rows written, add `InsertExec` to handle common case. (#6354)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b16a4034b3 INSERT returns number of rows written, add `InsertExec` to handle common case.  (#6354)
b16a4034b3 is described below

commit b16a4034b323cd50172fc07fc41eca94e8cebe93
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri May 19 14:27:52 2023 -0400

    INSERT returns number of rows written, add `InsertExec` to handle common case.  (#6354)
    
    * Add InsertExec, port in memory insert to use DataSink
    
    * fix: clippy
    
    * Add Display to Sink and update plans
    
    * Add additional verification that insert made it to the table
    
    * Add test to ensure the sort order is maintained for insert query
    
    * Ensure the sort order is maintained for insert query, test for same
---
 datafusion/core/src/datasource/datasource.rs       |  21 +-
 datafusion/core/src/datasource/memory.rs           | 119 ++++-
 datafusion/core/src/physical_plan/insert.rs        | 217 ++++++++
 datafusion/core/src/physical_plan/memory.rs        | 549 ---------------------
 datafusion/core/src/physical_plan/mod.rs           |   1 +
 .../core/tests/sqllogictests/test_files/insert.slt | 236 +++++++++
 6 files changed, 568 insertions(+), 575 deletions(-)

diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 4560b3820c..11f30f33d1 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -98,7 +98,26 @@ pub trait TableProvider: Sync + Send {
         None
     }
 
-    /// Insert into this table
+    /// Return an [`ExecutionPlan`] to insert data into this table, if
+    /// supported.
+    ///
+    /// The returned plan should return a single row in a UInt64
+    /// column called "count" such as the following
+    ///
+    /// ```text
+    /// +-------+,
+    /// | count |,
+    /// +-------+,
+    /// | 6     |,
+    /// +-------+,
+    /// ```
+    ///
+    /// # See Also
+    ///
+    /// See [`InsertExec`] for the common pattern of inserting a
+    /// single stream of `RecordBatch`es.
+    ///
+    /// [`InsertExec`]: crate::physical_plan::insert::InsertExec
     async fn insert_into(
         &self,
         _state: &SessionState,
diff --git a/datafusion/core/src/datasource/memory.rs b/datafusion/core/src/datasource/memory.rs
index f41f8cb1bd..0b8afda333 100644
--- a/datafusion/core/src/datasource/memory.rs
+++ b/datafusion/core/src/datasource/memory.rs
@@ -19,6 +19,7 @@
 
 use futures::StreamExt;
 use std::any::Any;
+use std::fmt::{self, Debug, Display};
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
@@ -30,11 +31,11 @@ use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
+use crate::physical_plan::insert::{DataSink, InsertExec};
 use crate::physical_plan::memory::MemoryExec;
-use crate::physical_plan::memory::MemoryWriteExec;
 use crate::physical_plan::ExecutionPlan;
+use crate::physical_plan::{common, SendableRecordBatchStream};
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// Type alias for partition data
@@ -164,7 +165,8 @@ impl TableProvider for MemTable {
         )?))
     }
 
-    /// Inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
+    /// Returns an ExecutionPlan that inserts the execution results of a given [`ExecutionPlan`] into this [`MemTable`].
+    ///
     /// The [`ExecutionPlan`] must have the same schema as this [`MemTable`].
     ///
     /// # Arguments
@@ -174,7 +176,7 @@ impl TableProvider for MemTable {
     ///
     /// # Returns
     ///
-    /// * A `Result` indicating success or failure.
+    /// * A plan that returns the number of rows written.
     async fn insert_into(
         &self,
         _state: &SessionState,
@@ -187,27 +189,61 @@ impl TableProvider for MemTable {
                 "Inserting query must have the same schema with the table.".to_string(),
             ));
         }
+        let sink = Arc::new(MemSink::new(self.batches.clone()));
+        Ok(Arc::new(InsertExec::new(input, sink)))
+    }
+}
 
-        if self.batches.is_empty() {
-            return Err(DataFusionError::Plan(
-                "The table must have partitions.".to_string(),
-            ));
+/// Implements for writing to a [`MemTable`]
+struct MemSink {
+    /// Target locations for writing data
+    batches: Vec<PartitionData>,
+}
+
+impl Debug for MemSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("MemSink")
+            .field("num_partitions", &self.batches.len())
+            .finish()
+    }
+}
+
+impl Display for MemSink {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        let partition_count = self.batches.len();
+        write!(f, "MemoryTable (partitions={partition_count})")
+    }
+}
+
+impl MemSink {
+    fn new(batches: Vec<PartitionData>) -> Self {
+        Self { batches }
+    }
+}
+
+#[async_trait]
+impl DataSink for MemSink {
+    async fn write_all(&self, mut data: SendableRecordBatchStream) -> Result<u64> {
+        let num_partitions = self.batches.len();
+
+        // buffer up the data round robin style into num_partitions
+
+        let mut new_batches = vec![vec![]; num_partitions];
+        let mut i = 0;
+        let mut row_count = 0;
+        while let Some(batch) = data.next().await.transpose()? {
+            row_count += batch.num_rows();
+            new_batches[i].push(batch);
+            i = (i + 1) % num_partitions;
         }
 
-        let input = if self.batches.len() > 1 {
-            Arc::new(RepartitionExec::try_new(
-                input,
-                Partitioning::RoundRobinBatch(self.batches.len()),
-            )?)
-        } else {
-            input
-        };
+        // write the outputs into the batches
+        for (target, mut batches) in self.batches.iter().zip(new_batches.into_iter()) {
+            // Append all the new batches in one go to minimize locking overhead
+            target.write().await.append(&mut batches);
+        }
 
-        Ok(Arc::new(MemoryWriteExec::try_new(
-            input,
-            self.batches.clone(),
-            self.schema.clone(),
-        )?))
+        Ok(row_count as u64)
     }
 }
 
@@ -218,8 +254,8 @@ mod tests {
     use crate::from_slice::FromSlice;
     use crate::physical_plan::collect;
     use crate::prelude::SessionContext;
-    use arrow::array::Int32Array;
-    use arrow::datatypes::{DataType, Field, Schema};
+    use arrow::array::{AsArray, Int32Array};
+    use arrow::datatypes::{DataType, Field, Schema, UInt64Type};
     use arrow::error::ArrowError;
     use datafusion_expr::LogicalPlanBuilder;
     use futures::StreamExt;
@@ -457,6 +493,11 @@ mod tests {
         initial_data: Vec<Vec<RecordBatch>>,
         inserted_data: Vec<Vec<RecordBatch>>,
     ) -> Result<Vec<Vec<RecordBatch>>> {
+        let expected_count: u64 = inserted_data
+            .iter()
+            .flat_map(|batches| batches.iter().map(|batch| batch.num_rows() as u64))
+            .sum();
+
         // Create a new session context
         let session_ctx = SessionContext::new();
         // Create and register the initial table with the provided schema and data
@@ -480,8 +521,8 @@ mod tests {
 
         // Execute the physical plan and collect the results
         let res = collect(plan, session_ctx.task_ctx()).await?;
-        // Ensure the result is empty after the insert operation
-        assert!(res.is_empty());
+        assert_eq!(extract_count(res), expected_count);
+
         // Read the data from the initial table and store it in a vector of partitions
         let mut partitions = vec![];
         for partition in initial_table.batches.iter() {
@@ -491,6 +532,34 @@ mod tests {
         Ok(partitions)
     }
 
+    /// Returns the value of results. For example, returns 6 given the follwing
+    ///
+    /// ```text
+    /// +-------+,
+    /// | count |,
+    /// +-------+,
+    /// | 6     |,
+    /// +-------+,
+    /// ```
+    fn extract_count(res: Vec<RecordBatch>) -> u64 {
+        assert_eq!(res.len(), 1, "expected one batch, got {}", res.len());
+        let batch = &res[0];
+        assert_eq!(
+            batch.num_columns(),
+            1,
+            "expected 1 column, got {}",
+            batch.num_columns()
+        );
+        let col = batch.column(0).as_primitive::<UInt64Type>();
+        assert_eq!(col.len(), 1, "expected 1 row, got {}", col.len());
+        let val = col
+            .iter()
+            .next()
+            .expect("had value")
+            .expect("expected non null");
+        val
+    }
+
     // Test inserting a single batch of data into a single partition
     #[tokio::test]
     async fn test_insert_into_single_partition() -> Result<()> {
diff --git a/datafusion/core/src/physical_plan/insert.rs b/datafusion/core/src/physical_plan/insert.rs
new file mode 100644
index 0000000000..904348c574
--- /dev/null
+++ b/datafusion/core/src/physical_plan/insert.rs
@@ -0,0 +1,217 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing data to [`DataSink`]s
+
+use super::expressions::PhysicalSortExpr;
+use super::{
+    DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
+};
+use crate::error::Result;
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use arrow_array::{ArrayRef, UInt64Array};
+use arrow_schema::{DataType, Field, Schema};
+use async_trait::async_trait;
+use core::fmt;
+use datafusion_physical_expr::PhysicalSortRequirement;
+use futures::StreamExt;
+use std::any::Any;
+use std::fmt::{Debug, Display};
+use std::sync::Arc;
+
+use crate::execution::context::TaskContext;
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
+use crate::physical_plan::Distribution;
+use datafusion_common::DataFusionError;
+
+/// `DataSink` implements writing streams of [`RecordBatch`]es to
+/// user defined destinations.
+///
+/// The `Display` impl is used to format the sink for explain plan
+/// output.
+#[async_trait]
+pub trait DataSink: Display + Debug + Send + Sync {
+    // TODO add desired input ordering
+    // How does this sink want its input ordered?
+
+    /// Writes the data to the sink, returns the number of values written
+    ///
+    /// This method will be called exactly once during each DML
+    /// statement. Thus prior to return, the sink should do any commit
+    /// or rollback required.
+    async fn write_all(&self, data: SendableRecordBatchStream) -> Result<u64>;
+}
+
+/// Execution plan for writing record batches to a [`DataSink`]
+///
+/// Returns a single row with the number of values written
+pub struct InsertExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Sink to whic to write
+    sink: Arc<dyn DataSink>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for InsertExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "InsertExec schema: {:?}", self.schema)
+    }
+}
+
+impl InsertExec {
+    /// Create a plan to write to `sink`
+    pub fn new(input: Arc<dyn ExecutionPlan>, sink: Arc<dyn DataSink>) -> Self {
+        Self {
+            input,
+            sink,
+            schema: make_count_schema(),
+        }
+    }
+}
+
+impl ExecutionPlan for InsertExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(1)
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        None
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        vec![Distribution::SinglePartition]
+    }
+
+    fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
+        // Require that the InsertExec gets the data in the order the
+        // input produced it (otherwise the optimizer may chose to reorder
+        // the input which could result in unintended / poor UX)
+        //
+        // More rationale:
+        // https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178
+        vec![self
+            .input
+            .output_ordering()
+            .map(PhysicalSortRequirement::from_sort_exprs)]
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        vec![false]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(Self {
+            input: children[0].clone(),
+            sink: self.sink.clone(),
+            schema: self.schema.clone(),
+        }))
+    }
+
+    /// Execute the plan and return a stream of `RecordBatch`es for
+    /// the specified partition.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        if partition != 0 {
+            return Err(DataFusionError::Internal(
+                format!("Invalid requested partition {partition}. InsertExec requires a single input partition."
+                )));
+        }
+
+        // Execute each of our own input's partitions and pass them to the sink
+        let input_partition_count = self.input.output_partitioning().partition_count();
+        if input_partition_count != 1 {
+            return Err(DataFusionError::Internal(format!(
+                "Invalid input partition count {input_partition_count}. \
+                         InsertExec needs only a single partition."
+            )));
+        }
+
+        let data = self.input.execute(0, context)?;
+        let schema = self.schema.clone();
+        let sink = self.sink.clone();
+
+        let stream = futures::stream::once(async move {
+            sink.write_all(data).await.map(make_count_batch)
+        })
+        .boxed();
+
+        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(f, "InsertExec: sink={}", self.sink)
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+/// Create a output record batch with a count
+///
+/// ```text
+/// +-------+,
+/// | count |,
+/// +-------+,
+/// | 6     |,
+/// +-------+,
+/// ```
+fn make_count_batch(count: u64) -> RecordBatch {
+    let array = Arc::new(UInt64Array::from_iter_values(vec![count])) as ArrayRef;
+
+    RecordBatch::try_from_iter_with_nullable(vec![("count", array, false)]).unwrap()
+}
+
+fn make_count_schema() -> SchemaRef {
+    // define a schema.
+    Arc::new(Schema::new(vec![Field::new(
+        "count",
+        DataType::UInt64,
+        false,
+    )]))
+}
diff --git a/datafusion/core/src/physical_plan/memory.rs b/datafusion/core/src/physical_plan/memory.rs
index aae8ffbcb7..79d6f6c1cc 100644
--- a/datafusion/core/src/physical_plan/memory.rs
+++ b/datafusion/core/src/physical_plan/memory.rs
@@ -26,18 +26,13 @@ use crate::error::Result;
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use core::fmt;
-use futures::StreamExt;
 use std::any::Any;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::datasource::memory::PartitionData;
 use crate::execution::context::TaskContext;
-use crate::physical_plan::stream::RecordBatchStreamAdapter;
-use crate::physical_plan::Distribution;
 use datafusion_common::DataFusionError;
 use futures::Stream;
-use tokio::sync::RwLock;
 
 /// Execution plan for reading in-memory batches of data
 pub struct MemoryExec {
@@ -243,547 +238,3 @@ impl RecordBatchStream for MemoryStream {
         self.schema.clone()
     }
 }
-
-/// Execution plan for writing record batches to an in-memory table.
-pub struct MemoryWriteExec {
-    /// Input plan that produces the record batches to be written.
-    input: Arc<dyn ExecutionPlan>,
-    /// Reference to the MemTable's partition data.
-    batches: Vec<PartitionData>,
-    /// Schema describing the structure of the data.
-    schema: SchemaRef,
-}
-
-impl fmt::Debug for MemoryWriteExec {
-    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
-        write!(f, "schema: {:?}", self.schema)
-    }
-}
-
-impl ExecutionPlan for MemoryWriteExec {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    /// Get the schema for this execution plan
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-
-    fn output_partitioning(&self) -> Partitioning {
-        Partitioning::UnknownPartitioning(
-            self.input.output_partitioning().partition_count(),
-        )
-    }
-
-    fn benefits_from_input_partitioning(&self) -> bool {
-        false
-    }
-
-    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
-        self.input.output_ordering()
-    }
-
-    fn required_input_distribution(&self) -> Vec<Distribution> {
-        // If the partition count of the MemTable is one, we want to require SinglePartition
-        // since it would induce better plans in plan optimizer.
-        if self.batches.len() == 1 {
-            vec![Distribution::SinglePartition]
-        } else {
-            vec![Distribution::UnspecifiedDistribution]
-        }
-    }
-
-    fn maintains_input_order(&self) -> Vec<bool> {
-        // In theory, if MemTable partition count equals the input plans output partition count,
-        // the Execution plan can preserve the order inside the partitions.
-        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
-    }
-
-    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
-        vec![self.input.clone()]
-    }
-
-    fn with_new_children(
-        self: Arc<Self>,
-        children: Vec<Arc<dyn ExecutionPlan>>,
-    ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(MemoryWriteExec::try_new(
-            children[0].clone(),
-            self.batches.clone(),
-            self.schema.clone(),
-        )?))
-    }
-
-    /// Execute the plan and return a stream of record batches for the specified partition.
-    /// Depending on the number of input partitions and MemTable partitions, it will choose
-    /// either a less lock acquiring or a locked implementation.
-    fn execute(
-        &self,
-        partition: usize,
-        context: Arc<TaskContext>,
-    ) -> Result<SendableRecordBatchStream> {
-        let batch_count = self.batches.len();
-        let data = self.input.execute(partition, context)?;
-        let schema = self.schema.clone();
-        let state = StreamState {
-            data,
-            buffer: vec![],
-            batch: self.batches[partition % batch_count].clone(),
-        };
-
-        let stream = futures::stream::unfold(state, |mut state| async move {
-            loop {
-                match state.data.next().await {
-                    Some(Ok(batch)) => state.buffer.push(batch),
-                    Some(Err(e)) => return Some((Err(e), state)),
-                    None => {
-                        // stream is done, transfer all data to target PartitionData
-                        state.batch.write().await.append(&mut state.buffer);
-                        return None;
-                    }
-                };
-            }
-        });
-
-        let adapter = Box::pin(RecordBatchStreamAdapter::new(schema, stream))
-            as SendableRecordBatchStream;
-
-        Ok(adapter)
-    }
-
-    fn fmt_as(
-        &self,
-        t: DisplayFormatType,
-        f: &mut std::fmt::Formatter,
-    ) -> std::fmt::Result {
-        match t {
-            DisplayFormatType::Default => {
-                write!(
-                    f,
-                    "MemoryWriteExec: partitions={}, input_partition={}",
-                    self.batches.len(),
-                    self.input.output_partitioning().partition_count()
-                )
-            }
-        }
-    }
-
-    fn statistics(&self) -> Statistics {
-        Statistics::default()
-    }
-}
-
-struct StreamState {
-    /// Input stream
-    data: SendableRecordBatchStream,
-    /// Data is buffered here until complete
-    buffer: Vec<RecordBatch>,
-    /// target
-    batch: PartitionData,
-}
-
-impl MemoryWriteExec {
-    /// Create a new execution plan for reading in-memory record batches
-    /// The provided `schema` should not have the projection applied.
-    pub fn try_new(
-        plan: Arc<dyn ExecutionPlan>,
-        batches: Vec<Arc<RwLock<Vec<RecordBatch>>>>,
-        schema: SchemaRef,
-    ) -> Result<Self> {
-        Ok(Self {
-            input: plan,
-            batches,
-            schema,
-        })
-    }
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::datasource::streaming::PartitionStream;
-    use crate::datasource::{MemTable, TableProvider};
-    use crate::from_slice::FromSlice;
-    use crate::physical_plan::stream::RecordBatchStreamAdapter;
-    use crate::physical_plan::streaming::StreamingTableExec;
-    use crate::physical_plan::ColumnStatistics;
-    use crate::physical_plan::{collect, displayable, SendableRecordBatchStream};
-    use crate::prelude::{CsvReadOptions, SessionContext};
-    use crate::test_util;
-    use arrow::array::Int32Array;
-    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-    use arrow::record_batch::RecordBatch;
-    use datafusion_common::Result;
-    use datafusion_execution::config::SessionConfig;
-    use datafusion_execution::TaskContext;
-    use futures::StreamExt;
-    use std::sync::Arc;
-
-    fn mock_data() -> Result<(SchemaRef, RecordBatch)> {
-        let schema = Arc::new(Schema::new(vec![
-            Field::new("a", DataType::Int32, false),
-            Field::new("b", DataType::Int32, false),
-            Field::new("c", DataType::Int32, true),
-            Field::new("d", DataType::Int32, false),
-        ]));
-
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![
-                Arc::new(Int32Array::from_slice([1, 2, 3])),
-                Arc::new(Int32Array::from_slice([4, 5, 6])),
-                Arc::new(Int32Array::from(vec![None, None, Some(9)])),
-                Arc::new(Int32Array::from_slice([7, 8, 9])),
-            ],
-        )?;
-
-        Ok((schema, batch))
-    }
-
-    #[tokio::test]
-    async fn test_with_projection() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let (schema, batch) = mock_data()?;
-
-        let executor = MemoryExec::try_new(&[vec![batch]], schema, Some(vec![2, 1]))?;
-        let statistics = executor.statistics();
-
-        assert_eq!(statistics.num_rows, Some(3));
-        assert_eq!(
-            statistics.column_statistics,
-            Some(vec![
-                ColumnStatistics {
-                    null_count: Some(2),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-                ColumnStatistics {
-                    null_count: Some(0),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-            ])
-        );
-
-        // scan with projection
-        let mut it = executor.execute(0, task_ctx)?;
-        let batch2 = it.next().await.unwrap()?;
-        assert_eq!(2, batch2.schema().fields().len());
-        assert_eq!("c", batch2.schema().field(0).name());
-        assert_eq!("b", batch2.schema().field(1).name());
-        assert_eq!(2, batch2.num_columns());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_without_projection() -> Result<()> {
-        let session_ctx = SessionContext::new();
-        let task_ctx = session_ctx.task_ctx();
-        let (schema, batch) = mock_data()?;
-
-        let executor = MemoryExec::try_new(&[vec![batch]], schema, None)?;
-        let statistics = executor.statistics();
-
-        assert_eq!(statistics.num_rows, Some(3));
-        assert_eq!(
-            statistics.column_statistics,
-            Some(vec![
-                ColumnStatistics {
-                    null_count: Some(0),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-                ColumnStatistics {
-                    null_count: Some(0),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-                ColumnStatistics {
-                    null_count: Some(2),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-                ColumnStatistics {
-                    null_count: Some(0),
-                    max_value: None,
-                    min_value: None,
-                    distinct_count: None,
-                },
-            ])
-        );
-
-        let mut it = executor.execute(0, task_ctx)?;
-        let batch1 = it.next().await.unwrap()?;
-        assert_eq!(4, batch1.schema().fields().len());
-        assert_eq!(4, batch1.num_columns());
-
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_insert_into() -> Result<()> {
-        // Create session context
-        let config = SessionConfig::new().with_target_partitions(8);
-        let ctx = SessionContext::with_config(config);
-        let testdata = test_util::arrow_test_data();
-        let schema = test_util::aggr_test_schema();
-        ctx.register_csv(
-            "aggregate_test_100",
-            &format!("{testdata}/csv/aggregate_test_100.csv"),
-            CsvReadOptions::new().schema(&schema),
-        )
-        .await?;
-        ctx.sql(
-            "CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
-        )
-        .await?;
-
-        let sql = "INSERT INTO table_without_values SELECT
-                SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
-                COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
-                FROM aggregate_test_100
-                ORDER by c1
-            ";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "MemoryWriteExec: partitions=1, input_partition=1",
-                "  ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]",
-                "    SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
-                "      ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]",
-                "        BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Ro [...]
-                "          SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
-                "            CoalesceBatchesExec: target_batch_size=8192",
-                "              RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
-                "                RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        Ok(())
-    }
-
-    #[tokio::test]
-    async fn test_insert_into_as_select_multi_partitioned() -> Result<()> {
-        // Create session context
-        let config = SessionConfig::new().with_target_partitions(8);
-        let ctx = SessionContext::with_config(config);
-        let testdata = test_util::arrow_test_data();
-        let schema = test_util::aggr_test_schema();
-        ctx.register_csv(
-            "aggregate_test_100",
-            &format!("{testdata}/csv/aggregate_test_100.csv"),
-            CsvReadOptions::new().schema(&schema),
-        )
-        .await?;
-        ctx.sql(
-            "CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)",
-        )
-        .await?;
-
-        let sql = "INSERT INTO table_without_values SELECT
-                SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
-                COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
-                FROM aggregate_test_100";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "MemoryWriteExec: partitions=1, input_partition=1",
-                "  CoalescePartitionsExec",
-                "    ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]",
-                "      BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows [...]
-                "        SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
-                "          CoalesceBatchesExec: target_batch_size=8192",
-                "            RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
-                "              RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        Ok(())
-    }
-
-    // TODO: The generated plan is suboptimal since SortExec is in global state.
-    #[tokio::test]
-    async fn test_insert_into_as_select_single_partition() -> Result<()> {
-        // Create session context
-        let config = SessionConfig::new().with_target_partitions(8);
-        let ctx = SessionContext::with_config(config);
-        let testdata = test_util::arrow_test_data();
-        let schema = test_util::aggr_test_schema();
-        ctx.register_csv(
-            "aggregate_test_100",
-            &format!("{testdata}/csv/aggregate_test_100.csv"),
-            CsvReadOptions::new().schema(&schema),
-        )
-        .await?;
-        ctx.sql("CREATE TABLE table_without_values AS SELECT
-                SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
-                COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
-                FROM aggregate_test_100")
-            .await?;
-
-        let sql = "INSERT INTO table_without_values SELECT
-                SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
-                COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
-                FROM aggregate_test_100
-                ORDER BY c1";
-        let msg = format!("Creating logical plan for '{sql}'");
-        let dataframe = ctx.sql(sql).await.expect(&msg);
-        let physical_plan = dataframe.create_physical_plan().await?;
-        let formatted = displayable(physical_plan.as_ref()).indent().to_string();
-        let expected = {
-            vec![
-                "MemoryWriteExec: partitions=8, input_partition=8",
-                "  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-                "    ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]",
-                "      SortPreservingMergeExec: [c1@2 ASC NULLS LAST]",
-                "        ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]",
-                "          BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: \"SUM(aggregate_test_100.c4)\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units:  [...]
-                "            SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]",
-                "              CoalesceBatchesExec: target_batch_size=8192",
-                "                RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 8), input_partitions=8",
-                "                  RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
-            ]
-        };
-
-        let actual: Vec<&str> = formatted.trim().lines().collect();
-        let actual_len = actual.len();
-        let actual_trim_last = &actual[..actual_len - 1];
-        assert_eq!(
-            expected, actual_trim_last,
-            "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
-        );
-        Ok(())
-    }
-
-    // DummyPartition is a simple implementation of the PartitionStream trait.
-    // It produces a stream of record batches with a fixed schema and the same content.
-    struct DummyPartition {
-        schema: SchemaRef,
-        batch: RecordBatch,
-        num_batches: usize,
-    }
-
-    impl PartitionStream for DummyPartition {
-        // Return a reference to the schema of this partition.
-        fn schema(&self) -> &SchemaRef {
-            &self.schema
-        }
-
-        // Execute the partition stream, producing a stream of record batches.
-        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
-            let batches = itertools::repeat_n(self.batch.clone(), self.num_batches);
-            Box::pin(RecordBatchStreamAdapter::new(
-                self.schema.clone(),
-                futures::stream::iter(batches).map(Ok),
-            ))
-        }
-    }
-
-    // Test the less-lock mode by inserting a large number of batches into a table.
-    #[tokio::test]
-    async fn test_one_to_one_mode() -> Result<()> {
-        let num_batches = 10000;
-        // Create a new session context
-        let session_ctx = SessionContext::new();
-        // Create a new schema with one field called "a" of type Int32
-        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
-
-        // Create a new batch of data to insert into the table
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
-        )?;
-        let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
-
-        let single_partition = Arc::new(DummyPartition {
-            schema: schema.clone(),
-            batch,
-            num_batches,
-        });
-        let input = Arc::new(StreamingTableExec::try_new(
-            schema.clone(),
-            vec![single_partition],
-            None,
-            false,
-        )?);
-        let plan = initial_table
-            .insert_into(&session_ctx.state(), input)
-            .await?;
-        let res = collect(plan, session_ctx.task_ctx()).await?;
-        assert!(res.is_empty());
-        // Ensure that the table now contains two batches of data in the same partition
-        assert_eq!(initial_table.batches[0].read().await.len(), num_batches);
-        Ok(())
-    }
-
-    // Test the locked mode by inserting a large number of batches into a table. It tests
-    // where the table partition count is not equal to the input's output partition count.
-    #[tokio::test]
-    async fn test_locked_mode() -> Result<()> {
-        let num_batches = 10000;
-        // Create a new session context
-        let session_ctx = SessionContext::new();
-        // Create a new schema with one field called "a" of type Int32
-        let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)]));
-
-        // Create a new batch of data to insert into the table
-        let batch = RecordBatch::try_new(
-            schema.clone(),
-            vec![Arc::new(Int32Array::from_slice([1, 2, 3]))],
-        )?;
-        let initial_table = Arc::new(MemTable::try_new(schema.clone(), vec![vec![]])?);
-
-        let single_partition = Arc::new(DummyPartition {
-            schema: schema.clone(),
-            batch,
-            num_batches,
-        });
-        let input = Arc::new(StreamingTableExec::try_new(
-            schema.clone(),
-            vec![
-                single_partition.clone(),
-                single_partition.clone(),
-                single_partition,
-            ],
-            None,
-            false,
-        )?);
-        let plan = initial_table
-            .insert_into(&session_ctx.state(), input)
-            .await?;
-        let res = collect(plan, session_ctx.task_ctx()).await?;
-        assert!(res.is_empty());
-        // Ensure that the table now contains two batches of data in the same partition
-        assert_eq!(initial_table.batches[0].read().await.len(), num_batches * 3);
-        Ok(())
-    }
-}
diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs
index c8edf701cf..bedbf84c17 100644
--- a/datafusion/core/src/physical_plan/mod.rs
+++ b/datafusion/core/src/physical_plan/mod.rs
@@ -663,6 +663,7 @@ pub mod empty;
 pub mod explain;
 pub mod file_format;
 pub mod filter;
+pub mod insert;
 pub mod joins;
 pub mod limit;
 pub mod memory;
diff --git a/datafusion/core/tests/sqllogictests/test_files/insert.slt b/datafusion/core/tests/sqllogictests/test_files/insert.slt
new file mode 100644
index 0000000000..dae5bb94f4
--- /dev/null
+++ b/datafusion/core/tests/sqllogictests/test_files/insert.slt
@@ -0,0 +1,236 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+
+#   http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+##########
+## INSERT tests
+##########
+
+
+statement ok
+CREATE EXTERNAL TABLE aggregate_test_100 (
+  c1  VARCHAR NOT NULL,
+  c2  TINYINT NOT NULL,
+  c3  SMALLINT NOT NULL,
+  c4  SMALLINT,
+  c5  INT,
+  c6  BIGINT NOT NULL,
+  c7  SMALLINT NOT NULL,
+  c8  INT NOT NULL,
+  c9  BIGINT UNSIGNED NOT NULL,
+  c10 VARCHAR NOT NULL,
+  c11 FLOAT NOT NULL,
+  c12 DOUBLE NOT NULL,
+  c13 VARCHAR NOT NULL
+)
+STORED AS CSV
+WITH HEADER ROW
+LOCATION '../../testing/data/csv/aggregate_test_100.csv'
+
+# test_insert_into
+
+statement ok
+set datafusion.execution.target_partitions = 8;
+
+statement ok
+CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL);
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@0 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@1 as field2]
+----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as SUM(aggregate_test_100.c4), COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as COUNT(UInt8(1)), c1@0 as c1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Prec [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)
+FROM aggregate_test_100
+ORDER by c1
+----
+100
+
+# verify there is data now in the table
+query I
+SELECT COUNT(*) from table_without_values;
+----
+100
+
+# verify there is data now in the table
+query II
+SELECT *
+FROM table_without_values
+ORDER BY field1, field2
+LIMIT 5;
+----
+-70111 3
+-65362 3
+-62295 3
+-56721 3
+-55414 3
+
+statement ok
+drop table table_without_values;
+
+
+
+# test_insert_into_as_select_multi_partitioned
+statement ok
+CREATE TABLE table_without_values(field1 BIGINT NULL, field2 BIGINT NULL)
+
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS field2
+----WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--CoalescePartitionsExec
+----ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as field1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as field2]
+------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preced [...]
+--------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+----
+100
+
+statement ok
+drop table table_without_values;
+
+
+# test_insert_into_as_select_single_partition
+
+statement ok
+CREATE TABLE table_without_values AS SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+
+
+# // TODO: The generated plan is suboptimal since SortExec is in global state.
+query TT
+EXPLAIN
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+ORDER BY c1
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: a1 AS a1, a2 AS a2
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------Projection: SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING AS a2, aggregate_test_100.c1
+--------WindowAggr: windowExpr=[[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING]]
+----------TableScan: aggregate_test_100 projection=[c1, c4, c9]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=8)
+--ProjectionExec: expr=[a1@0 as a1, a2@1 as a2]
+----SortPreservingMergeExec: [c1@2 ASC NULLS LAST]
+------ProjectionExec: expr=[SUM(aggregate_test_100.c4) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@3 as a1, COUNT(UInt8(1)) PARTITION BY [aggregate_test_100.c1] ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING@4 as a2, c1@0 as c1]
+--------BoundedWindowAggExec: wdw=[SUM(aggregate_test_100.c4): Ok(Field { name: "SUM(aggregate_test_100.c4)", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(1)) }, COUNT(UInt8(1)): Ok(Field { name: "COUNT(UInt8(1))", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Prec [...]
+----------SortExec: expr=[c1@0 ASC NULLS LAST,c9@2 ASC NULLS LAST]
+------------CoalesceBatchesExec: target_batch_size=8192
+--------------RepartitionExec: partitioning=Hash([Column { name: "c1", index: 0 }], 8), input_partitions=8
+----------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
+------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c4, c9], has_header=true
+
+
+query II
+INSERT INTO table_without_values SELECT
+SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a1,
+COUNT(*) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) as a2
+FROM aggregate_test_100
+ORDER BY c1
+----
+100
+
+
+statement ok
+drop table table_without_values;
+
+# test_insert_into_with_sort
+
+statement ok
+create table table_without_values(c1 varchar not null);
+
+# verify that the sort order of the insert query is maintained into the
+# insert (there should be a SortExec in the following plan)
+# See https://github.com/apache/arrow-datafusion/pull/6354#discussion_r1195284178 for more background
+query TT
+explain insert into table_without_values select c1 from aggregate_test_100 order by c1;
+----
+logical_plan
+Dml: op=[Insert] table=[table_without_values]
+--Projection: aggregate_test_100.c1 AS c1
+----Sort: aggregate_test_100.c1 ASC NULLS LAST
+------TableScan: aggregate_test_100 projection=[c1]
+physical_plan
+InsertExec: sink=MemoryTable (partitions=1)
+--ProjectionExec: expr=[c1@0 as c1]
+----SortExec: expr=[c1@0 ASC NULLS LAST]
+------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1], has_header=true
+
+query T
+insert into table_without_values select c1 from aggregate_test_100 order by c1;
+----
+100
+
+query I
+select count(*) from table_without_values;
+----
+100
+
+statement ok
+drop table table_without_values;