You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "tustvold (via GitHub)" <gi...@apache.org> on 2023/04/24 18:00:04 UTC

[GitHub] [arrow-datafusion] tustvold commented on a diff in pull request #6049: MemoryExec INSERT INTO refactor to use ExecutionPlan

tustvold commented on code in PR #6049:
URL: https://github.com/apache/arrow-datafusion/pull/6049#discussion_r1175620359


##########
datafusion/core/src/physical_plan/memory_insert.rs:
##########
@@ -0,0 +1,661 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Execution plan for writing in-memory batches of data
+
+use std::any::Any;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+use std::{fmt, mem};
+
+use crate::physical_plan::{
+    DisplayFormatType, Distribution, ExecutionPlan, Partitioning, RecordBatchStream,
+    SendableRecordBatchStream,
+};
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::{Result, Statistics};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalSortExpr;
+use futures::FutureExt;
+use futures::{ready, Stream, StreamExt};
+use tokio::sync::{OwnedRwLockWriteGuard, RwLock};
+
+// Type alias for partition data
+type PartitionData = Arc<RwLock<Vec<RecordBatch>>>;
+
+// Macro to simplify polling a future
+macro_rules! ready_poll {
+    ($e:expr, $cx:expr) => {
+        ready!(Box::pin($e).poll_unpin($cx))
+    };
+}
+
+/// Execution plan for writing record batches to an in-memory table.
+pub struct MemoryWriteExec {
+    /// Input plan that produces the record batches to be written.
+    input: Arc<dyn ExecutionPlan>,
+    /// Reference to the MemTable's partition data.
+    batches: Arc<Vec<PartitionData>>,
+    /// Schema describing the structure of the data.
+    schema: SchemaRef,
+}
+
+impl fmt::Debug for MemoryWriteExec {
+    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+        write!(f, "schema: {:?}", self.schema)
+    }
+}
+
+impl ExecutionPlan for MemoryWriteExec {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    /// Get the schema for this execution plan
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn output_partitioning(&self) -> Partitioning {
+        Partitioning::UnknownPartitioning(
+            self.input.output_partitioning().partition_count(),
+        )
+    }
+
+    fn benefits_from_input_partitioning(&self) -> bool {
+        false
+    }
+
+    fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
+        self.input.output_ordering()
+    }
+
+    fn required_input_distribution(&self) -> Vec<Distribution> {
+        // If the partition count of the MemTable is one, we want to require SinglePartition
+        // since it would induce better plans in plan optimizer.
+        if self.batches.len() == 1 {
+            vec![Distribution::SinglePartition]
+        } else {
+            vec![Distribution::UnspecifiedDistribution]
+        }
+    }
+
+    fn maintains_input_order(&self) -> Vec<bool> {
+        // In theory, if MemTable partition count equals the input plans output partition count,
+        // the Execution plan can preserve the order inside the partitions.
+        vec![self.batches.len() == self.input.output_partitioning().partition_count()]
+    }
+
+    fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
+        vec![self.input.clone()]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(MemoryWriteExec::try_new(
+            children[0].clone(),
+            self.batches.clone(),
+            self.schema.clone(),
+        )?))
+    }
+
+    /// Execute the plan and return a stream of record batches for the specified partition.
+    /// Depending on the number of input partitions and MemTable partitions, it will choose
+    /// either a less lock acquiring or a locked implementation.
+    fn execute(
+        &self,
+        partition: usize,
+        context: Arc<TaskContext>,
+    ) -> Result<SendableRecordBatchStream> {
+        let batch_count = self.batches.len();
+        let data = self.input.execute(partition, context)?;
+        if batch_count >= self.input.output_partitioning().partition_count() {
+            // If the number of input partitions matches the number of MemTable partitions,
+            // use a lightweight implementation that doesn't utilize as many locks.
+            let table_partition = self.batches[partition].clone();
+            Ok(Box::pin(MemorySinkOneToOneStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        } else {
+            // Otherwise, use the locked implementation.
+            let table_partition = self.batches[partition % batch_count].clone();
+            Ok(Box::pin(MemorySinkStream::try_new(
+                table_partition,
+                data,
+                self.schema.clone(),
+            )?))
+        }
+    }
+
+    fn fmt_as(
+        &self,
+        t: DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        match t {
+            DisplayFormatType::Default => {
+                write!(
+                    f,
+                    "MemoryWriteExec: partitions={}, input_partition={}",
+                    self.batches.len(),
+                    self.input.output_partitioning().partition_count()
+                )
+            }
+        }
+    }
+
+    fn statistics(&self) -> Statistics {
+        Statistics::default()
+    }
+}
+
+impl MemoryWriteExec {
+    /// Create a new execution plan for reading in-memory record batches
+    /// The provided `schema` should not have the projection applied.
+    pub fn try_new(
+        plan: Arc<dyn ExecutionPlan>,
+        batches: Arc<Vec<Arc<RwLock<Vec<RecordBatch>>>>>,
+        schema: SchemaRef,
+    ) -> Result<Self> {
+        Ok(Self {
+            input: plan,
+            batches,
+            schema,
+        })
+    }
+}
+
+/// This object encodes the different states of the [`MemorySinkStream`] when
+/// processing record batches.
+enum MemorySinkStreamState {
+    /// The stream is pulling data from the input.
+    Pull,
+    /// The stream is writing data to the table partition.
+    Write { maybe_batch: Option<RecordBatch> },
+}
+
+/// A stream that saves record batches in memory-backed storage.
+/// Can work even when multiple input partitions map to the same table
+/// partition, achieves buffer exclusivity by locking before writing.
+pub struct MemorySinkStream {

Review Comment:
   FWIW I think this could be implemented more concisely using [`futures::unfold`](https://docs.rs/futures/latest/futures/prelude/stream/fn.unfold.html) and [`RecordBatchStreamAdapter`](https://docs.rs/datafusion/latest/datafusion/physical_plan/stream/struct.RecordBatchStreamAdapter.html).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org