You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/09 21:59:28 UTC

[arrow-datafusion] branch main updated: Remove SizedRecordBatchStream (#6309)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7cf248b8cc Remove SizedRecordBatchStream (#6309)
7cf248b8cc is described below

commit 7cf248b8cce13e619dde75871e308c75977770fe
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 9 22:59:22 2023 +0100

    Remove SizedRecordBatchStream (#6309)
---
 datafusion/core/src/physical_plan/common.rs       | 56 ++---------------------
 datafusion/core/src/physical_plan/explain.rs      | 18 ++------
 datafusion/core/tests/provider_filter_pushdown.rs | 21 ++++-----
 3 files changed, 16 insertions(+), 79 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 42cd8fada9..ce1299fb6d 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -17,17 +17,16 @@
 
 //! Defines common code used in execution plans
 
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::SendableRecordBatchStream;
 use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::execution::memory_pool::MemoryReservation;
-use crate::physical_plan::metrics::MemTrackingMetrics;
 use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::Schema;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
 use datafusion_physical_expr::PhysicalSortExpr;
-use futures::{Future, Stream, StreamExt, TryStreamExt};
+use futures::{Future, StreamExt, TryStreamExt};
 use log::debug;
 use parking_lot::Mutex;
 use pin_project_lite::pin_project;
@@ -42,55 +41,6 @@ use tokio::task::JoinHandle;
 /// [`MemoryReservation`] used across query execution streams
 pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
 
-/// Stream of record batches
-pub struct SizedRecordBatchStream {
-    schema: SchemaRef,
-    batches: Vec<Arc<RecordBatch>>,
-    index: usize,
-    metrics: MemTrackingMetrics,
-}
-
-impl SizedRecordBatchStream {
-    /// Create a new RecordBatchIterator
-    pub fn new(
-        schema: SchemaRef,
-        batches: Vec<Arc<RecordBatch>>,
-        mut metrics: MemTrackingMetrics,
-    ) -> Self {
-        let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
-        metrics.init_mem_used(size);
-        SizedRecordBatchStream {
-            schema,
-            index: 0,
-            batches,
-            metrics,
-        }
-    }
-}
-
-impl Stream for SizedRecordBatchStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        let poll = Poll::Ready(if self.index < self.batches.len() {
-            self.index += 1;
-            Some(Ok(self.batches[self.index - 1].as_ref().clone()))
-        } else {
-            None
-        });
-        self.metrics.record_poll(poll)
-    }
-}
-
-impl RecordBatchStream for SizedRecordBatchStream {
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
-
 /// Create a vector of record batches from a stream
 pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
     stream.try_collect::<Vec<_>>().await
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index c46a4a7391..6eb72e4ff3 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -23,17 +23,14 @@ use std::sync::Arc;
 use crate::{
     error::{DataFusionError, Result},
     logical_expr::StringifiedPlan,
-    physical_plan::{
-        common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan, Partitioning,
-        Statistics,
-    },
+    physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics},
 };
 use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
 use log::trace;
 
 use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
 use crate::execution::context::TaskContext;
-use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
 
 /// Explain execution plan operator. This operator contains the string
 /// values of the various plans it has when it is created, and passes
@@ -150,17 +147,12 @@ impl ExecutionPlan for ExplainExec {
             ],
         )?;
 
-        let metrics = ExecutionPlanMetricsSet::new();
-        let tracking_metrics =
-            MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
-
         trace!(
-            "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+            "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
 
-        Ok(Box::pin(SizedRecordBatchStream::new(
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
             self.schema.clone(),
-            vec![Arc::new(record_batch)],
-            tracking_metrics,
+            futures::stream::iter(vec![Ok(record_batch)]),
         )))
     }
 
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index 36b0789829..ac1eef850d 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -23,9 +23,8 @@ use datafusion::datasource::datasource::{TableProvider, TableType};
 use datafusion::error::Result;
 use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
 use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
-use datafusion::physical_plan::common::SizedRecordBatchStream;
 use datafusion::physical_plan::expressions::PhysicalSortExpr;
-use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
 use datafusion::physical_plan::{
     DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
 };
@@ -56,7 +55,7 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
 #[derive(Debug)]
 struct CustomPlan {
     schema: SchemaRef,
-    batches: Vec<Arc<RecordBatch>>,
+    batches: Vec<RecordBatch>,
 }
 
 impl ExecutionPlan for CustomPlan {
@@ -89,16 +88,12 @@ impl ExecutionPlan for CustomPlan {
 
     fn execute(
         &self,
-        partition: usize,
-        context: Arc<TaskContext>,
+        _partition: usize,
+        _context: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let metrics = ExecutionPlanMetricsSet::new();
-        let tracking_metrics =
-            MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
-        Ok(Box::pin(SizedRecordBatchStream::new(
+        Ok(Box::pin(RecordBatchStreamAdapter::new(
             self.schema(),
-            self.batches.clone(),
-            tracking_metrics,
+            futures::stream::iter(self.batches.clone().into_iter().map(Ok)),
         )))
     }
 
@@ -183,8 +178,8 @@ impl TableProvider for CustomProvider {
                 Ok(Arc::new(CustomPlan {
                     schema: self.zero_batch.schema(),
                     batches: match int_value {
-                        0 => vec![Arc::new(self.zero_batch.clone())],
-                        1 => vec![Arc::new(self.one_batch.clone())],
+                        0 => vec![self.zero_batch.clone()],
+                        1 => vec![self.one_batch.clone()],
                         _ => vec![],
                     },
                 }))