You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ja...@apache.org on 2023/06/25 02:56:05 UTC
[arrow-datafusion] branch main updated: Move PartitionStream to physical_plan (#6756)
This is an automated email from the ASF dual-hosted git repository.
jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 878fec124e Move PartitionStream to physical_plan (#6756)
878fec124e is described below
commit 878fec124ebeb2c7fa7d384cec10ee500a68db04
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Sat Jun 24 22:55:59 2023 -0400
Move PartitionStream to physical_plan (#6756)
---
datafusion/core/src/catalog/information_schema.rs | 7 +++++--
datafusion/core/src/datasource/streaming.rs | 15 +++------------
datafusion/core/src/physical_plan/streaming.rs | 10 +++++++++-
datafusion/core/tests/memory_limit.rs | 3 ++-
4 files changed, 19 insertions(+), 16 deletions(-)
diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs
index 033bb3266e..b30683a3ea 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -28,13 +28,16 @@ use arrow::{
record_batch::RecordBatch,
};
-use crate::config::{ConfigEntry, ConfigOptions};
-use crate::datasource::streaming::{PartitionStream, StreamingTable};
+use crate::datasource::streaming::StreamingTable;
use crate::datasource::TableProvider;
use crate::execution::context::TaskContext;
use crate::logical_expr::TableType;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::SendableRecordBatchStream;
+use crate::{
+ config::{ConfigEntry, ConfigOptions},
+ physical_plan::streaming::PartitionStream,
+};
use super::{schema::SchemaProvider, CatalogList};
diff --git a/datafusion/core/src/datasource/streaming.rs b/datafusion/core/src/datasource/streaming.rs
index a5fc6f1929..1a3cd92916 100644
--- a/datafusion/core/src/datasource/streaming.rs
+++ b/datafusion/core/src/datasource/streaming.rs
@@ -28,18 +28,9 @@ use datafusion_expr::{Expr, TableType};
use log::debug;
use crate::datasource::TableProvider;
-use crate::execution::context::{SessionState, TaskContext};
-use crate::physical_plan::streaming::StreamingTableExec;
-use crate::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
-
-/// A partition that can be converted into a [`SendableRecordBatchStream`]
-pub trait PartitionStream: Send + Sync {
- /// Returns the schema of this partition
- fn schema(&self) -> &SchemaRef;
-
- /// Returns a stream yielding this partitions values
- fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
-}
+use crate::execution::context::SessionState;
+use crate::physical_plan::streaming::{PartitionStream, StreamingTableExec};
+use crate::physical_plan::ExecutionPlan;
/// A [`TableProvider`] that streams a set of [`PartitionStream`]
pub struct StreamingTable {
diff --git a/datafusion/core/src/physical_plan/streaming.rs b/datafusion/core/src/physical_plan/streaming.rs
index 797e19c467..48f1409e14 100644
--- a/datafusion/core/src/physical_plan/streaming.rs
+++ b/datafusion/core/src/physical_plan/streaming.rs
@@ -28,11 +28,19 @@ use datafusion_common::{DataFusionError, Result, Statistics};
use datafusion_physical_expr::PhysicalSortExpr;
use log::debug;
-use crate::datasource::streaming::PartitionStream;
use crate::physical_plan::stream::RecordBatchStreamAdapter;
use crate::physical_plan::{ExecutionPlan, Partitioning, SendableRecordBatchStream};
use datafusion_execution::TaskContext;
+/// A partition that can be converted into a [`SendableRecordBatchStream`]
+pub trait PartitionStream: Send + Sync {
+ /// Returns the schema of this partition
+ fn schema(&self) -> &SchemaRef;
+
+ /// Returns a stream yielding this partitions values
+ fn execute(&self, ctx: Arc<TaskContext>) -> SendableRecordBatchStream;
+}
+
/// An [`ExecutionPlan`] for [`PartitionStream`]
pub struct StreamingTableExec {
partitions: Vec<Arc<dyn PartitionStream>>,
diff --git a/datafusion/core/tests/memory_limit.rs b/datafusion/core/tests/memory_limit.rs
index f2e1223dc6..36872b7361 100644
--- a/datafusion/core/tests/memory_limit.rs
+++ b/datafusion/core/tests/memory_limit.rs
@@ -19,10 +19,11 @@
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
+use datafusion::physical_plan::streaming::PartitionStream;
use futures::StreamExt;
use std::sync::Arc;
-use datafusion::datasource::streaming::{PartitionStream, StreamingTable};
+use datafusion::datasource::streaming::StreamingTable;
use datafusion::datasource::MemTable;
use datafusion::execution::context::SessionState;
use datafusion::execution::disk_manager::DiskManagerConfig;