You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/25 02:56:05 UTC

[arrow-datafusion] branch main updated: Move PartitionStream to physical_plan (#6756)

This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 878fec124e Move PartitionStream to physical_plan (#6756)
878fec124e is described below

commit 878fec124ebeb2c7fa7d384cec10ee500a68db04
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jun 24 22:55:59 2023 -0400

    Move PartitionStream to physical_plan (#6756)
---
 datafusion/core/src/catalog/information_schema.rs |  7 +++++--
 datafusion/core/src/datasource/streaming.rs       | 15 +++------------
 datafusion/core/src/physical_plan/streaming.rs    | 10 +++++++++-
 datafusion/core/tests/memory_limit.rs             |  3 ++-
 4 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs
index 033bb3266e..b30683a3ea 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -28,13 +28,16 @@ use arrow::{
     record_batch::RecordBatch,
 };
 
-use crate::config::{ConfigEntry, ConfigOptions};
-use crate::datasource::streaming::{PartitionStream, StreamingTable};
+use crate::datasource::streaming::StreamingTable;
 use crate::datasource::TableProvider;
 use crate::execution::context::TaskContext;
 use crate::logical_expr::TableType;
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use crate::physical_plan::SendableRecordBatchStream;
+use crate::{
+    config::{ConfigEntry, ConfigOptions},
+    physical_plan::streaming::PartitionStream,
+};
 
 use super::{schema::SchemaProvider, CatalogList};
 
diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs
index a5fc6f1929..1a3cd92916 100644
--- a/datafusion/core/src/datasource/streaming.rs
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -28,18 +28,9 @@ use datafusion_expr::{Expr, TableType};
 use log::debug;
 
 use crate::datasource::TableProvider;
-use crate::execution::context::{SessionState, TaskContext};
-use crate::physical_plan::streaming::StreamingTableExec;
-use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-
-/// A partition that can be converted into a [`SendableRecordBatchStream`]
-pub trait PartitionStream: Send + Sync {
-    /// Returns the schema of this partition
-    fn schema(&self) -> &SchemaRef;
-
-    /// Returns a stream yielding this partitions values
-    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
-}
+use crate::execution::context::SessionState;
+use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec};
+use crate::physical_plan::ExecutionPlan;
 
 /// A [`TableProvider`] that streams a set of [`PartitionStream`]
 pub struct StreamingTable {
diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs
index 797e19c467..48f1409e14 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -28,11 +28,19 @@ use datafusion_common::{DataFusionError, Result, Statistics};
 use datafusion_physical_expr::PhysicalSortExpr;
 use log::debug;
 
-use crate::datasource::streaming::PartitionStream;
 use crate::physical_plan::stream::RecordBatchStreamAdapter;
 use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
 use datafusion_execution::TaskContext;
 
+/// A partition that can be converted into a [`SendableRecordBatchStream`]
+pub trait PartitionStream: Send + Sync {
+    /// Returns the schema of this partition
+    fn schema(&self) -> &SchemaRef;
+
+    /// Returns a stream yielding this partitions values
+    fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
+}
+
 /// An [`ExecutionPlan`] for [`PartitionStream`]
 pub struct StreamingTableExec {
     partitions: Vec<Arc<dyn PartitionStream>>,
diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs
index f2e1223dc6..36872b7361 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -19,10 +19,11 @@
 
 use arrow::datatypes::SchemaRef;
 use arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::streaming::PartitionStream;
 use futures::StreamExt;
 use std::sync::Arc;
 
-use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
+use datafusion::datasource::streaming::StreamingTable;
 use datafusion::datasource::MemTable;
 use datafusion::execution::context::SessionState;
 use datafusion::execution::disk_manager::DiskManagerConfig;