You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/05/09 21:59:28 UTC
[arrow-datafusion] branch main updated: Remove SizedRecordBatchStream (#6309)
This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7cf248b8cc Remove SizedRecordBatchStream (#6309)
7cf248b8cc is described below
commit 7cf248b8cce13e619dde75871e308c75977770fe
Author: Raphael Taylor-Davies <17...@users.noreply.github.com>
AuthorDate: Tue May 9 22:59:22 2023 +0100
Remove SizedRecordBatchStream (#6309)
---
datafusion/core/src/physical_plan/common.rs | 56 ++---------------------
datafusion/core/src/physical_plan/explain.rs | 18 ++------
datafusion/core/tests/provider_filter_pushdown.rs | 21 ++++-----
3 files changed, 16 insertions(+), 79 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs
index 42cd8fada9..ce1299fb6d 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -17,17 +17,16 @@
//! Defines common code used in execution plans
-use super::{RecordBatchStream, SendableRecordBatchStream};
+use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
-use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
-use arrow::datatypes::{Schema, SchemaRef};
+use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::PhysicalSortExpr;
-use futures::{Future, Stream, StreamExt, TryStreamExt};
+use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
@@ -42,55 +41,6 @@ use tokio::task::JoinHandle;
/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;
-/// Stream of record batches
-pub struct SizedRecordBatchStream {
- schema: SchemaRef,
- batches: Vec<Arc<RecordBatch>>,
- index: usize,
- metrics: MemTrackingMetrics,
-}
-
-impl SizedRecordBatchStream {
- /// Create a new RecordBatchIterator
- pub fn new(
- schema: SchemaRef,
- batches: Vec<Arc<RecordBatch>>,
- mut metrics: MemTrackingMetrics,
- ) -> Self {
- let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
- metrics.init_mem_used(size);
- SizedRecordBatchStream {
- schema,
- index: 0,
- batches,
- metrics,
- }
- }
-}
-
-impl Stream for SizedRecordBatchStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- let poll = Poll::Ready(if self.index < self.batches.len() {
- self.index += 1;
- Some(Ok(self.batches[self.index - 1].as_ref().clone()))
- } else {
- None
- });
- self.metrics.record_poll(poll)
- }
-}
-
-impl RecordBatchStream for SizedRecordBatchStream {
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
-
/// Create a vector of record batches from a stream
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream.try_collect::<Vec<_>>().await
diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs
index c46a4a7391..6eb72e4ff3 100644
--- a/datafusion/core/src/physical_plan/explain.rs
+++ b/datafusion/core/src/physical_plan/explain.rs
@@ -23,17 +23,14 @@ use std::sync::Arc;
use crate::{
error::{DataFusionError, Result},
logical_expr::StringifiedPlan,
- physical_plan::{
- common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan, Partitioning,
- Statistics,
- },
+ physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use log::trace;
use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
-use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
+use crate::physical_plan::stream::RecordBatchStreamAdapter;
/// Explain execution plan operator. This operator contains the string
/// values of the various plans it has when it is created, and passes
@@ -150,17 +147,12 @@ impl ExecutionPlan for ExplainExec {
],
)?;
- let metrics = ExecutionPlanMetricsSet::new();
- let tracking_metrics =
- MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
-
trace!(
- "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
+ "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
- Ok(Box::pin(SizedRecordBatchStream::new(
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
- vec![Arc::new(record_batch)],
- tracking_metrics,
+ futures::stream::iter(vec![Ok(record_batch)]),
)))
}
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index 36b0789829..ac1eef850d 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -23,9 +23,8 @@ use datafusion::datasource::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
-use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
-use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
+use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
@@ -56,7 +55,7 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
#[derive(Debug)]
struct CustomPlan {
schema: SchemaRef,
- batches: Vec<Arc<RecordBatch>>,
+ batches: Vec<RecordBatch>,
}
impl ExecutionPlan for CustomPlan {
@@ -89,16 +88,12 @@ impl ExecutionPlan for CustomPlan {
fn execute(
&self,
- partition: usize,
- context: Arc<TaskContext>,
+ _partition: usize,
+ _context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- let metrics = ExecutionPlanMetricsSet::new();
- let tracking_metrics =
- MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
- Ok(Box::pin(SizedRecordBatchStream::new(
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
- self.batches.clone(),
- tracking_metrics,
+ futures::stream::iter(self.batches.clone().into_iter().map(Ok)),
)))
}
@@ -183,8 +178,8 @@ impl TableProvider for CustomProvider {
Ok(Arc::new(CustomPlan {
schema: self.zero_batch.schema(),
batches: match int_value {
- 0 => vec![Arc::new(self.zero_batch.clone())],
- 1 => vec![Arc::new(self.one_batch.clone())],
+ 0 => vec![self.zero_batch.clone()],
+ 1 => vec![self.one_batch.clone()],
_ => vec![],
},
}))