You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "metesynnada (via GitHub)" <gi...@apache.org> on 2023/04/18 15:36:38 UTC

[GitHub] [arrow-datafusion] metesynnada opened a new pull request, #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

metesynnada opened a new pull request, #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049

   # Which issue does this PR close?
   
   Closes #6008.
   
   # Rationale for this change
   
   INSERT INTO is currently implemented separately from the query planner, which is avoidable. Like major engines, we can add inserting into `LogicalPlan` and `PhysicalPlan`.
   
   # What changes are included in this PR?
   - How a MemTable holds its `RecordBatch` data,
   - Moving insert dml logical plan into into `create_initial_plan` instead of separate plan addition (abi buraya dikkat),
   - `MemoryWriterExec` for execution.
   
   # Are these changes tested?
   Yes
   
   # Are there any user-facing changes?
   
   Now, to execute the insert query, the user needs to call collect, as in all major engines.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] aprimadi commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "aprimadi (via GitHub)" <gi...@apache.org>.
aprimadi commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1178618270


##########
datafusion/core/src/datasource/datasource.rs:
##########
@@ -102,8 +102,8 @@ pub trait TableProvider: Sync + Send {
     async fn insert_into(
         &self,
         _state: &SessionState,
-        _input: &LogicalPlan,
-    ) -> Result<()> {
+        _input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {

Review Comment:
   Just a question because I'm kinda new to this project. If `insert_into` takes as input `Vec<SendableRecordBatchStream>`, shouldn't it be able to run multiple partitions in parallel?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175622446


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(

Review Comment:
   FWIW I think this could be implemented more concisely using [`futures::unfold`](https://docs.rs/futures/latest/futures/prelude/stream/fn.unfold.html) and [`RecordBatchStreamAdapter`](https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchStreamAdapter.html).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1177504793


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(

Review Comment:
   I am not sure how to acquire the async lock within the fold, for once. Also, what you are folding onto will be the output, it is unclear to me how to make it result `Result<RecordBatch>`. Can you suggest it here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175625987


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))

Review Comment:
   The Box is redundant if calling poll_unpin



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175629119


##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -19,33 +19,31 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and then
 //! repeatedly queried without incurring additional file I/O overhead.
 
-use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_expr::LogicalPlan;
+use futures::StreamExt;
 use tokio::sync::RwLock;
-use tokio::task;
 
 use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::memory_insert::MemoryWriteExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// In-memory table
 #[derive(Debug)]
 pub struct MemTable {
     schema: SchemaRef,
-    batches: Arc<RwLock<Vec<Vec<RecordBatch>>>>,
+    pub(crate) batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,

Review Comment:
   This seems to have a lot more indirections than really necessary, it could conceivably be `Vec<Arc<RwLock<Vec<RecordBatch>>>>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mustafasrepo merged pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "mustafasrepo (via GitHub)" <gi...@apache.org>.
mustafasrepo merged PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175640347


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,

Review Comment:
   More an observation, but relates to https://github.com/apache/arrow-datafusion/issues/5076, it seems like MemoryWriteExec is very tightly coupled with the internal data structures of MemTable. I think this is inevitable for all `TableProvider`, which makes me wonder if this should really just be an internal implementation detail of a given `TableProvider`, as it will be highly specific?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179692890


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   This is simply a baby step towards implementing things like `COPY TO`s, `INSERT INTO`s and functionality like that. In practical cases, these things will write their outputs through a `TableProvider` like a `ListingTable`. How do we test what that will look like and iterate on such functionality? Basically, this is the simplest writer that one can perform end-to-end (read-transform-write) experiments with, its main intention is for anyone to use as a basis/example for implementing their read/write operators for their own `TableProvider`s. I hope it makes a little more sense now.
   
   > If you guys feel strongly that this is the right and maintainable solution going forward I will defer to your judgement, but from where I see it just adds complexity / technical debt for no reason.
   
   Thank you -- rest assured we will be maintaining and helping with related work going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179692890


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   This is simply a baby step towards implementing things like `COPY TO`s, `INSERT INTO`s and functionality like that. In practical cases, these things will write their outputs through a `TableProvider` like a `ListingTable`. How do we test what that will look like and iterate on such functionality? Basically, this is the simplest writer that one can perform end-to-end (read-transform-write) experiments with, its main intention is for anyone to use as a basis/example for implementing their write operators. I hope it makes a little more sense now.
   
   > If you guys feel strongly that this is the right and maintainable solution going forward I will defer to your judgement, but from where I see it just adds complexity / technical debt for no reason.
   
   Thank you -- rest assured we will be maintaining and helping with related work going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1519624238

   Thank you for reviewing and providing valuable feedback. I appreciate your input on the change to `TableProvider::insert_into` to accept an `ExecutionPlan`. My intention behind this implementation was to align with the discussion in the #5076, which suggests re-implementing DataFrame.write_* methods to use a LogicalPlan::Write operation along with a physical operator to perform the write.
   
   The rationale behind this approach is to create a more unified and extensible architecture for handling write operations, similar to other major engines like Apache Spark, which incorporates write operations in both logical and physical plans.
   
   I understand your concerns about creating an `ExecutionPlan` as output, specifically regarding the methods that might not make sense in this context, like "output ordering" and "preserves input." In light of the related issue, I believe that integrating write operations into the `ExecutionPlan` can help address the need for a more generic `LogicalPlan::Write ` operation. `INSERT INTO` support was just the beginning.
   
   If you believe that simplifying the PR without losing functionality is the best course of action, I am willing to make the necessary changes. However, I would appreciate further discussion on how to align this PR with the goals outlined in issue #5076 while addressing your concerns about the resulting `ExecutionPlan`.
   
   Thank you again for your feedback, and I look forward to refining this PR with your assistance and in accordance with the project's roadmap.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1520525427

   So I guess I would prefer one of the following
   1. Remove the ExecutionPlan from this PR
   2. Make the ExecutionPlan more general (work for any table provider) rather than just the MemExec


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1523644348

   Thanks for the review @alamb and @tustvold , just sent a commit to address your concerns. We moved `MemoryWriteExec` to `memory.rs` and removed the new module in the physical plan. Now, `memory.rs` contains the operator reading from it (`MemoryExec`) and the operator writing into it (`MemoryWriteExec`). We also made some fixes per Rafael’s comments. Can you please take a look and share your thoughts? Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1178155013


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   I may have misunderstood the discussion on https://github.com/apache/arrow-datafusion/issues/5076#issuecomment-1523554165
   
   But I thought the consensus was to have a generic ExecutionPlan for writing data, rather than one that was specific to MemTable
   
   So something like
   
   ```
   pub struct MemoryWriteExec {
       /// Input plan that produces the record batches to be written.
       input: Arc<dyn ExecutionPlan>,
       /// Target  TableProvider (or table factory?).
       target: Arc<dyn TableFactory>,
       /// Schema describing the structure of the data.
       schema: SchemaRef,
   }
   ```
   
   🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179310840


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   Experimenting with the single plan idea, my main challenge was figuring out the best way to determine the execution place of the insert operation, i.e what `insert_into()` API would return. Do we return a `tokio::AsyncWrite` (accepts a `RecordBatch`, to use in a `futures::stream`), a `futures::stream`, or possibly creating a new trait that would be tailored specifically for the TableProvider information?
   If the API was to return a `futures::stream` as the operator, it might not allow for a single execution plan without some workarounds. But if I use a `tokio::AsyncWrite`, it may be possible to use a single executor, though we haven’t used this before in Datafusion.
   All in all, having a distinct `ExecutionPlan` to handle writes for each data source mirrors the behavior on the read side and seems much more natural. What do you think?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179709793


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Vec<PartitionData>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Vec<Arc<RwLock<Vec<RecordBatch>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {
+    /// Stream of record batches to be inserted into the memory table.
+    data: SendableRecordBatchStream,
+    /// Memory table partition that stores the record batches.
+    table_partition: PartitionData,
+    /// Schema representing the structure of the data.
+    schema: SchemaRef,
+    /// State of the iterator when processing multiple polls.
+    state: MemorySinkStreamState,
+}
+
+impl MemorySinkStream {
+    /// Create a new `MemorySinkStream` with the provided parameters.
+    pub fn try_new(
+        table_partition: PartitionData,
+        data: SendableRecordBatchStream,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            table_partition,
+            data,
+            schema,
+            state: MemorySinkStreamState::Pull,
+        })
+    }
+
+    /// Implementation of the `poll_next` method. Continuously polls the record
+    /// batch stream, switching between the Pull and Write states. In case of
+    /// an error, returns the error immediately.
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            match &mut self.state {
+                MemorySinkStreamState::Pull => {
+                    // Pull data from the input stream.
+                    if let Some(result) = ready!(self.data.as_mut().poll_next(cx)) {
+                        match result {
+                            Ok(batch) => {
+                                // Switch to the Write state with the received batch.
+                                self.state = MemorySinkStreamState::Write {
+                                    maybe_batch: Some(batch),
+                                }
+                            }
+                            Err(e) => return Poll::Ready(Some(Err(e))), // Return the error immediately.
+                        }
+                    } else {
+                        return Poll::Ready(None); // If the input stream is exhausted, return None.
+                    }
+                }
+                MemorySinkStreamState::Write { maybe_batch } => {
+                    // Acquire a write lock on the table partition.
+                    let mut partition =
+                        ready!(self.table_partition.write().boxed().poll_unpin(cx));
+                    if let Some(b) = mem::take(maybe_batch) {
+                        partition.push(b); // Insert the batch into the table partition.
+                    }
+                    self.state = MemorySinkStreamState::Pull; // Switch back to the Pull state.
+                }
+            }
+        }
+    }
+}
+
+impl Stream for MemorySinkStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl RecordBatchStream for MemorySinkStream {
+    /// Get the schema
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkOneToOneStream`]
+/// when processing record batches.
+enum MemorySinkOneToOneStreamState {
+    /// The `Acquire` variant represents the state where the [`MemorySinkOneToOneStream`]
+    /// is waiting to acquire the write lock on the shared partition to store the record batches.
+    Acquire,
+
+    /// The `Pull` variant represents the state where the [`MemorySinkOneToOneStream`] has
+    /// acquired the write lock on the shared partition and can pull record batches from
+    /// the input stream to store in the partition.
+    Pull {
+        /// The `partition` field contains an [`OwnedRwLockWriteGuard`] which wraps the
+        /// shared partition, providing exclusive write access to the underlying `Vec<RecordBatch>`.
+        partition: OwnedRwLockWriteGuard<Vec<RecordBatch>>,
+    },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Assumes that every table partition has at most one corresponding input
+/// partition, so it locks the table partition only once.
+pub struct MemorySinkOneToOneStream {

Review Comment:
   Same here, I would keep these private so they can be tweaked later, e.g. to just use futures::stream::unfold



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179120795


##########
datafusion/core/src/datasource/datasource.rs:
##########
@@ -102,8 +102,8 @@ pub trait TableProvider: Sync + Send {
     async fn insert_into(
         &self,
         _state: &SessionState,
-        _input: &LogicalPlan,
-    ) -> Result<()> {
+        _input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {

Review Comment:
   Hi @aprimadi, thanks for the question 😀.  If this is the only consideration, yes it would be able to do so with some workarounds. However, this is not the correct design pattern for providing the concurrency. We usually use the `execute()` API in `ExecutionPlan` to achieve this type of parallelism since it would have the input partition information, its output partition count, etc.
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175629119


##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -19,33 +19,31 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and then
 //! repeatedly queried without incurring additional file I/O overhead.
 
-use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_expr::LogicalPlan;
+use futures::StreamExt;
 use tokio::sync::RwLock;
-use tokio::task;
 
 use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::memory_insert::MemoryWriteExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// In-memory table
 #[derive(Debug)]
 pub struct MemTable {
     schema: SchemaRef,
-    batches: Arc<RwLock<Vec<Vec<RecordBatch>>>>,
+    pub(crate) batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,

Review Comment:
   This seems to have a lot more indirections than really necessary, it could conceivably be `Arc<[RwLock<Vec<RecordBatch>>]>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175717815


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,

Review Comment:
   > which makes me wonder if this should really just be an internal implementation detail of a given TableProvider, as it will be highly specific?
   
   yes, this is my preference as well (mentioned in https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1520524109)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175620359


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {

Review Comment:
   FWIW I think this could be implemented more concisely using [`futures::unfold`](https://docs.rs/futures/latest/futures/prelude/stream/fn.unfold.html) and [`RecordBatchStreamAdapter`](https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchStreamAdapter.html).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1174595757


##########
datafusion/core/src/datasource/datasource.rs:
##########
@@ -102,8 +102,8 @@ pub trait TableProvider: Sync + Send {
     async fn insert_into(
         &self,
         _state: &SessionState,
-        _input: &LogicalPlan,
-    ) -> Result<()> {
+        _input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {

Review Comment:
   I agree taking an `ExecutionPlan`  is probably better than LogicalPlan as it keeps planning separate from the execution
   
   I also thought about making it a `SendableRecordBatch` to avoid plans entirely, but that would prevent things like running multiple partitions in parallel if the datasource supports it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1178155013


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   I may have misunderstood the discussion on https://github.com/apache/arrow-datafusion/issues/5076#issuecomment-1523554165
   
   But I thought the consensus was to have a generic ExecutionPlan for writing data, rather than one that was specific to MemTable
   
   So something like
   
   ```
   pub struct WriterExec {
       /// Input plan that produces the record batches to be written.
       input: Arc<dyn ExecutionPlan>,
       /// Target  TableProvider (or table factory?).
       target: Arc<dyn TableFactory>,
       /// Schema describing the structure of the data.
       schema: SchemaRef,
   }
   ```
   
   🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179692890


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   This is simply a baby step for implementing things like `COPY TO`s, `INSERT INTO`s and functionality like that. In practical cases, these things will write their outputs through a `TableProvider` like a `ListingTable`. How do we test what that will look like and iterate on such functionality? Basically, this is the simplest writer that one can perform end-to-end (read-transform-write) experiments with, its main intention is for anyone to use as a basis/example for implementing their write operators. I hope it makes a little more sense now.
   
   > If you guys feel strongly that this is the right and maintainable solution going forward I will defer to your judgement, but from where I see it just adds complexity / technical debt for no reason.
   
   Thank you -- rest assured we will be maintaining and helping with related work going forward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1520524109

   > If you believe that simplifying the PR without losing functionality is the best course of action, I am willing to make the necessary changes.
   
   This is my preference -- and if/when we find that having an `ExecutionPlan` that implements the write logic, we can perhaps revive that part of this PR
   
   >  However, I would appreciate further discussion on how to align this PR with the goals outlined in issue https://github.com/apache/arrow-datafusion/issues/5076 while addressing your concerns about the resulting ExecutionPlan.
   
   My high level thinking is that a TableProvider should be a "Sink" that takes streams from executing
   
   After thinking about it more, I think an `ExecutionPlan` for implementing Inserts might be ok for the reasons you mention. 
   
   However,  concerns I have with the `MemoryWriteExec` in this PR is:
   1. It is tightly bound to the specific `MemTable` TableProvider but is in the general purpose physical plan area -- it seems the MemTable specific functionality should stay near / within the `datasource` module. 
   2. Most of the logic in `MemoryWriteExec` seems like it is general purpose -- only the actual `RwLock<Vec<RecordBatch>>>>` seems specific to Memory
   
   My reading of https://github.com/apache/arrow-datafusion/issues/5076 and https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-SparkPlan-DataWritingCommandExec.html suggest that there is single specific physical operator that handles writes to arbitrary datasources which is how I would prefer DataFusion proceeds. 
   
   I left some additional comments about this plan on https://github.com/apache/arrow-datafusion/issues/5076#issuecomment-1520521168


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#issuecomment-1527406034

   I've filed https://github.com/apache/arrow-datafusion/pull/6154 to try to refactor the code to be a little clearer, PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179619431


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   > All in all, having a distinct ExecutionPlan to handle writes for each data source mirrors the behavior on the read side and seems much more natural. What do you think?
   
   I guess I don't fully understand why adding 800 lines of `MemTable` specific ExecutionPlan  that seems to add no new functionality is an improvement
   
   Not only only this this  pattern seem to  overly complicate this PR, but it sets us on a course for every other table source to have to do the same (mostly repeated) things.
   
   Perhaps there is a future use case I don't understand.
   
   If you guys feel strongly that this is the right and maintainable solution going forward I will defer to your judgement, but from where I see it just adds complexity / technical debt for no reason. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1176075832


##########
datafusion/core/src/datasource/memory.rs:
##########
@@ -19,33 +19,31 @@
 //! queried by DataFusion. This allows data to be pre-loaded into memory and then
 //! repeatedly queried without incurring additional file I/O overhead.
 
-use futures::{StreamExt, TryStreamExt};
 use std::any::Any;
 use std::sync::Arc;
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
 use async_trait::async_trait;
-use datafusion_expr::LogicalPlan;
+use futures::StreamExt;
 use tokio::sync::RwLock;
-use tokio::task;
 
 use crate::datasource::{TableProvider, TableType};
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::SessionState;
 use crate::logical_expr::Expr;
-use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;
 use crate::physical_plan::common;
 use crate::physical_plan::common::AbortOnDropSingle;
 use crate::physical_plan::memory::MemoryExec;
+use crate::physical_plan::memory_insert::MemoryWriteExec;
 use crate::physical_plan::ExecutionPlan;
 use crate::physical_plan::{repartition::RepartitionExec, Partitioning};
 
 /// In-memory table
 #[derive(Debug)]
 pub struct MemTable {
     schema: SchemaRef,
-    batches: Arc<RwLock<Vec<Vec<RecordBatch>>>>,
+    pub(crate) batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,

Review Comment:
   You are right, we can move on with `Vec<Arc<RwLock<Vec<RecordBatch>>>>`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1177515381


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))

Review Comment:
   Did you try it in code? It will produce an error since lock future (`impl Future<Output = RwLockWriteGuard<'_, Vec<RecordBatch>>>`) does not implement `Unpin`. `BoxFuture`s are implementing this, I think you made the same thing in `FileStreamState`?
   
   Is there another poll method that does not require pinning the future? I could use that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175620359


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {

Review Comment:
   FWIW I think this could be implemented more concisely using [`futures::unfold`](https://docs.rs/futures/latest/futures/prelude/stream/fn.unfold.html) and [`RecordBatchStreamAdapter`](https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchStreamAdapter.html).
   
   You won't be able to name the type, but I don't think this is necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179333939


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   I agree that a single plan is likely not tractable, it will be highly specific to the `TableProvider` in question. What I had meant was simply not returning anything and leaving everything an internal implementation detail of the `TableProvider` from how it executes the provided `ExecutionPlan` through to how that is exposed. Not only would this make this PR significantly simpler, obviating the need for any of the ExecutionPlan or Stream shenanigans, but would provide the most flexibility for table implementations that have more complex scheduling or transaction handling requirements. Exposing `ExecutionPlan` is a pretty strong API commitment, and I _personally_ am not confident this won't be painting ourselves into a corner, but if the consensus is otherwise :shrug: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "tustvold (via GitHub)" <gi...@apache.org>.
tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179709577


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Vec<PartitionData>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Vec<Arc<RwLock<Vec<RecordBatch>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {

Review Comment:
   Does this need to be public?



##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Vec<PartitionData>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Vec<Arc<RwLock<Vec<RecordBatch>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {
+    /// Stream of record batches to be inserted into the memory table.
+    data: SendableRecordBatchStream,
+    /// Memory table partition that stores the record batches.
+    table_partition: PartitionData,
+    /// Schema representing the structure of the data.
+    schema: SchemaRef,
+    /// State of the iterator when processing multiple polls.
+    state: MemorySinkStreamState,
+}
+
+impl MemorySinkStream {
+    /// Create a new `MemorySinkStream` with the provided parameters.
+    pub fn try_new(
+        table_partition: PartitionData,
+        data: SendableRecordBatchStream,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            table_partition,
+            data,
+            schema,
+            state: MemorySinkStreamState::Pull,
+        })
+    }
+
+    /// Implementation of the `poll_next` method. Continuously polls the record
+    /// batch stream, switching between the Pull and Write states. In case of
+    /// an error, returns the error immediately.
+    fn poll_next_impl(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Option<Result<RecordBatch>>> {
+        loop {
+            match &mut self.state {
+                MemorySinkStreamState::Pull => {
+                    // Pull data from the input stream.
+                    if let Some(result) = ready!(self.data.as_mut().poll_next(cx)) {
+                        match result {
+                            Ok(batch) => {
+                                // Switch to the Write state with the received batch.
+                                self.state = MemorySinkStreamState::Write {
+                                    maybe_batch: Some(batch),
+                                }
+                            }
+                            Err(e) => return Poll::Ready(Some(Err(e))), // Return the error immediately.
+                        }
+                    } else {
+                        return Poll::Ready(None); // If the input stream is exhausted, return None.
+                    }
+                }
+                MemorySinkStreamState::Write { maybe_batch } => {
+                    // Acquire a write lock on the table partition.
+                    let mut partition =
+                        ready!(self.table_partition.write().boxed().poll_unpin(cx));
+                    if let Some(b) = mem::take(maybe_batch) {
+                        partition.push(b); // Insert the batch into the table partition.
+                    }
+                    self.state = MemorySinkStreamState::Pull; // Switch back to the Pull state.
+                }
+            }
+        }
+    }
+}
+
+impl Stream for MemorySinkStream {
+    type Item = Result<RecordBatch>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut Context<'_>,
+    ) -> Poll<Option<Self::Item>> {
+        self.poll_next_impl(cx)
+    }
+}
+
+impl RecordBatchStream for MemorySinkStream {
+    /// Get the schema
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkOneToOneStream`]
+/// when processing record batches.
+enum MemorySinkOneToOneStreamState {
+    /// The `Acquire` variant represents the state where the [`MemorySinkOneToOneStream`]
+    /// is waiting to acquire the write lock on the shared partition to store the record batches.
+    Acquire,
+
+    /// The `Pull` variant represents the state where the [`MemorySinkOneToOneStream`] has
+    /// acquired the write lock on the shared partition and can pull record batches from
+    /// the input stream to store in the partition.
+    Pull {
+        /// The `partition` field contains an [`OwnedRwLockWriteGuard`] which wraps the
+        /// shared partition, providing exclusive write access to the underlying `Vec<RecordBatch>`.
+        partition: OwnedRwLockWriteGuard<Vec<RecordBatch>>,
+    },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Assumes that every table partition has at most one corresponding input
+/// partition, so it locks the table partition only once.
+pub struct MemorySinkOneToOneStream {

Review Comment:
   Same here, I would keep these private



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] metesynnada commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "metesynnada (via GitHub)" <gi...@apache.org>.
metesynnada commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179120795


##########
datafusion/core/src/datasource/datasource.rs:
##########
@@ -102,8 +102,8 @@ pub trait TableProvider: Sync + Send {
     async fn insert_into(
         &self,
         _state: &SessionState,
-        _input: &LogicalPlan,
-    ) -> Result<()> {
+        _input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {

Review Comment:
   The current implementation is already executing the writing parallel. The proposed way is changing where it is handled. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ozankabak commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

Posted by "ozankabak (via GitHub)" <gi...@apache.org>.
ozankabak commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1179165585


##########
datafusion/core/src/physical_plan/memory.rs:
##########
@@ -223,15 +245,365 @@ impl RecordBatchStream for MemoryStream {
     }
 }
 
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {

Review Comment:
   > But I thought the consensus was to have a generic ExecutionPlan for writing data, rather than one that was specific to MemTable
   
   Are you sure about this? My understanding was going through the `ExecutionPlan` route for each writer, not a single plan for everything. There is a separate `Exec` on the read side for every `TableProvider`, why would we not follow the same pattern?
   
   Anyways, I asked @metesynnada to experiment with the single plan idea, but he wasn't able to make progress on it -- he will comment here with his findings.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org