You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/20 22:34:42 UTC
[arrow-datafusion] branch master updated: Remove dependency from `LogicalPlan::TableScan` to `ExecutionPlan` (#2284)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 817199b3b Remove dependency from `LogicalPlan::TableScan` to `ExecutionPlan` (#2284)
817199b3b is described below
commit 817199b3b87b6169d7acd9e01149fc97f82f9834
Author: Andy Grove <ag...@apache.org>
AuthorDate: Wed Apr 20 16:34:36 2022 -0600
Remove dependency from `LogicalPlan::TableScan` to `ExecutionPlan` (#2284)
---
ballista/rust/client/src/context.rs | 10 ++--
ballista/rust/core/src/serde/logical_plan/mod.rs | 11 ++--
datafusion/core/src/catalog/information_schema.rs | 3 +-
datafusion/core/src/dataframe.rs | 2 +-
datafusion/core/src/datasource/datasource.rs | 29 +--------
datafusion/core/src/datasource/listing/table.rs | 17 +++---
datafusion/core/src/datasource/mod.rs | 3 +-
datafusion/core/src/logical_plan/builder.rs | 7 ++-
datafusion/core/src/logical_plan/mod.rs | 1 +
datafusion/core/src/logical_plan/plan.rs | 68 +++++++++++++++++++++-
datafusion/core/src/optimizer/filter_push_down.rs | 12 ++--
datafusion/core/src/physical_plan/planner.rs | 5 +-
datafusion/core/tests/provider_filter_pushdown.rs | 4 +-
datafusion/expr/src/lib.rs | 2 +
.../datasource.rs => expr/src/table_source.rs} | 43 +++++---------
15 files changed, 126 insertions(+), 91 deletions(-)
diff --git a/ballista/rust/client/src/context.rs b/ballista/rust/client/src/context.rs
index 7d21b1121..5f77b369e 100644
--- a/ballista/rust/client/src/context.rs
+++ b/ballista/rust/client/src/context.rs
@@ -34,7 +34,9 @@ use datafusion::catalog::TableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::TableProvider;
use datafusion::error::{DataFusionError, Result};
-use datafusion::logical_plan::{CreateExternalTable, FileType, LogicalPlan, TableScan};
+use datafusion::logical_plan::{
+ source_as_provider, CreateExternalTable, FileType, LogicalPlan, TableScan,
+};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ParquetReadOptions, SessionConfig, SessionContext,
};
@@ -270,7 +272,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_csv(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
- self.register_table(name, source)
+ self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
@@ -284,7 +286,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
- self.register_table(name, source)
+ self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
@@ -298,7 +300,7 @@ impl BallistaContext {
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan() {
LogicalPlan::TableScan(TableScan { source, .. }) => {
- self.register_table(name, source)
+ self.register_table(name, source_as_provider(&source)?)
}
_ => Err(DataFusionError::Internal("Expected tables scan".to_owned())),
}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index d88498b86..b9ab11704 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -32,9 +32,9 @@ use datafusion::logical_plan::plan::{
Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, Window,
};
use datafusion::logical_plan::{
- Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CrossJoin, Expr,
- JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition, TableScan,
- Values,
+ source_as_provider, Column, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
+ CrossJoin, Expr, JoinConstraint, Limit, LogicalPlan, LogicalPlanBuilder, Repartition,
+ TableScan, Values,
};
use datafusion::prelude::SessionContext;
@@ -510,6 +510,7 @@ impl AsLogicalPlan for LogicalPlanNode {
projection,
..
}) => {
+ let source = source_as_provider(source)?;
let schema = source.schema();
let source = source.as_any();
@@ -982,6 +983,7 @@ mod roundtrip_tests {
use crate::serde::{AsLogicalPlan, BallistaCodec};
use async_trait::async_trait;
use core::panic;
+ use datafusion::logical_plan::source_as_provider;
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datafusion_data_access::{
@@ -1434,7 +1436,8 @@ mod roundtrip_tests {
let round_trip_store = match round_trip {
LogicalPlan::TableScan(scan) => {
- match scan.source.as_ref().as_any().downcast_ref::<ListingTable>() {
+ let source = source_as_provider(&scan.source)?;
+ match source.as_ref().as_any().downcast_ref::<ListingTable>() {
Some(listing_table) => {
format!("{:?}", listing_table.object_store())
}
diff --git a/datafusion/core/src/catalog/information_schema.rs b/datafusion/core/src/catalog/information_schema.rs
index 38306150a..7df9aec95 100644
--- a/datafusion/core/src/catalog/information_schema.rs
+++ b/datafusion/core/src/catalog/information_schema.rs
@@ -31,7 +31,8 @@ use arrow::{
};
use datafusion_common::Result;
-use crate::datasource::{MemTable, TableProvider, TableType};
+use crate::datasource::{MemTable, TableProvider};
+use crate::logical_expr::TableType;
use super::{
catalog::{CatalogList, CatalogProvider},
diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index eac9ae29a..5a7017486 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -33,8 +33,8 @@ use crate::arrow::datatypes::Schema;
use crate::arrow::datatypes::SchemaRef;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
-use crate::datasource::TableType;
use crate::execution::context::{SessionState, TaskContext};
+use crate::logical_expr::TableType;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::{collect, collect_partitioned};
use crate::physical_plan::{execute_stream, execute_stream_partitioned, ExecutionPlan};
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/core/src/datasource/datasource.rs
index 1b59c857f..f4fdc975d 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/core/src/datasource/datasource.rs
@@ -21,40 +21,13 @@ use std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
+use datafusion_expr::{TableProviderFilterPushDown, TableType};
use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
use crate::logical_plan::Expr;
use crate::physical_plan::ExecutionPlan;
-/// Indicates whether and how a filter expression can be handled by a
-/// TableProvider for table scans.
-#[derive(Debug, Clone, PartialEq)]
-pub enum TableProviderFilterPushDown {
- /// The expression cannot be used by the provider.
- Unsupported,
- /// The expression can be used to help minimise the data retrieved,
- /// but the provider cannot guarantee that all returned tuples
- /// satisfy the filter. The Filter plan node containing this expression
- /// will be preserved.
- Inexact,
- /// The provider guarantees that all returned data satisfies this
- /// filter expression. The Filter plan node containing this expression
- /// will be removed.
- Exact,
-}
-
-/// Indicates the type of this table for metadata/catalog purposes.
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum TableType {
- /// An ordinary physical table.
- Base,
- /// A non-materialised table that itself uses a query internally to provide data.
- View,
- /// A transient table.
- Temporary,
-}
-
/// Source table
#[async_trait]
pub trait TableProvider: Sync + Send {
diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs
index 18f12d03a..9e554c13d 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -23,6 +23,14 @@ use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use futures::StreamExt;
+use crate::datasource::{
+ file_format::{
+ avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
+ FileFormat,
+ },
+ get_statistics_with_limit, TableProvider,
+};
+use crate::logical_expr::TableProviderFilterPushDown;
use crate::{
error::{DataFusionError, Result},
logical_plan::Expr,
@@ -33,15 +41,6 @@ use crate::{
},
};
-use crate::datasource::{
- datasource::TableProviderFilterPushDown,
- file_format::{
- avro::AvroFormat, csv::CsvFormat, json::JsonFormat, parquet::ParquetFormat,
- FileFormat,
- },
- get_statistics_with_limit, TableProvider,
-};
-
use super::PartitionedFile;
use datafusion_data_access::object_store::ObjectStore;
diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs
index 2a801ff21..f4d059e3d 100644
--- a/datafusion/core/src/datasource/mod.rs
+++ b/datafusion/core/src/datasource/mod.rs
@@ -27,11 +27,12 @@ pub mod object_store_registry;
use futures::Stream;
-pub use self::datasource::{TableProvider, TableType};
+pub use self::datasource::TableProvider;
use self::listing::PartitionedFile;
pub use self::memory::MemTable;
use crate::arrow::datatypes::{Schema, SchemaRef};
use crate::error::Result;
+pub use crate::logical_expr::TableType;
use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
use futures::StreamExt;
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index c88b25d0a..e417b0dc1 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -46,8 +46,9 @@ use std::{
use super::dfschema::ToDFSchema;
use super::{exprlist_to_fields, Expr, JoinConstraint, JoinType, LogicalPlan, PlanType};
use crate::logical_plan::{
- columnize_expr, normalize_col, normalize_cols, rewrite_sort_cols_by_aggs, Column,
- CrossJoin, DFField, DFSchema, DFSchemaRef, Limit, Partitioning, Repartition, Values,
+ columnize_expr, normalize_col, normalize_cols, provider_as_source,
+ rewrite_sort_cols_by_aggs, Column, CrossJoin, DFField, DFSchema, DFSchemaRef, Limit,
+ Partitioning, Repartition, Values,
};
use crate::sql::utils::group_window_expr_by_sort_keys;
@@ -449,7 +450,7 @@ impl LogicalPlanBuilder {
let table_scan = LogicalPlan::TableScan(TableScan {
table_name,
- source: provider,
+ source: provider_as_source(provider),
projected_schema: Arc::new(projected_schema),
projection,
filters,
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index d2c0a1fa3..7ab16affd 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -62,6 +62,7 @@ pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
+pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 72fd3fa28..a26d47358 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -20,11 +20,13 @@
use super::display::{GraphvizVisitor, IndentVisitor};
use super::expr::{Column, Expr};
use super::extension::UserDefinedLogicalNode;
-use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::datasource::TableProvider;
use crate::error::DataFusionError;
+use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::dfschema::DFSchemaRef;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_expr::TableSource;
+use std::any::Any;
use std::fmt::Formatter;
use std::{
collections::HashSet,
@@ -124,13 +126,75 @@ pub struct Window {
pub schema: DFSchemaRef,
}
+/// DataFusion default table source, wrapping TableProvider
+///
+/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
+/// (logical plan trait)
+pub struct DefaultTableSource {
+ /// table provider
+ pub table_provider: Arc<dyn TableProvider>,
+}
+
+impl DefaultTableSource {
+ /// Create a new DefaultTableSource to wrap a TableProvider
+ pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
+ Self { table_provider }
+ }
+}
+
+impl TableSource for DefaultTableSource {
+ /// Returns the table source as [`Any`](std::any::Any) so that it can be
+ /// downcast to a specific implementation.
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ /// Get a reference to the schema for this table
+ fn schema(&self) -> SchemaRef {
+ self.table_provider.schema()
+ }
+
+ /// Tests whether the table provider can make use of a filter expression
+ /// to optimise data retrieval.
+ fn supports_filter_pushdown(
+ &self,
+ filter: &Expr,
+ ) -> datafusion_common::Result<TableProviderFilterPushDown> {
+ self.table_provider.supports_filter_pushdown(filter)
+ }
+}
+
+/// Wrap TableProvider in TableSource
+pub fn provider_as_source(
+ table_provider: Arc<dyn TableProvider>,
+) -> Arc<dyn TableSource> {
+ Arc::new(DefaultTableSource::new(table_provider))
+}
+
+/// Attempt to downcast a TableSource to DefaultTableSource and access the
+/// TableProvider. This will only work with a TableSource created by DataFusion.
+pub fn source_as_provider(
+ source: &Arc<dyn TableSource>,
+) -> datafusion_common::Result<Arc<dyn TableProvider>> {
+ match source
+ .as_ref()
+ .as_any()
+ .downcast_ref::<DefaultTableSource>()
+ {
+ Some(source) => Ok(source.table_provider.clone()),
+ _ => Err(DataFusionError::Internal(
+ "TableSource was not DefaultTableSource".to_string(),
+ )),
+ }
+}
+
/// Produces rows from a table provider by reference or from the context
#[derive(Clone)]
pub struct TableScan {
/// The name of the table
pub table_name: String,
/// The source of the table
- pub source: Arc<dyn TableProvider>,
+ pub source: Arc<dyn TableSource>,
/// Optional column indices to use as a projection
pub projection: Option<Vec<usize>>,
/// The schema description of the output
diff --git a/datafusion/core/src/optimizer/filter_push_down.rs b/datafusion/core/src/optimizer/filter_push_down.rs
index 30a7ee973..19535de86 100644
--- a/datafusion/core/src/optimizer/filter_push_down.rs
+++ b/datafusion/core/src/optimizer/filter_push_down.rs
@@ -14,8 +14,8 @@
//! Filter Push Down optimizer rule ensures that filters are applied as early as possible in the plan
-use crate::datasource::datasource::TableProviderFilterPushDown;
use crate::execution::context::ExecutionProps;
+use crate::logical_expr::TableProviderFilterPushDown;
use crate::logical_plan::plan::{Aggregate, Filter, Join, Projection, Union};
use crate::logical_plan::{
and, col, replace_col, Column, CrossJoin, JoinType, Limit, LogicalPlan, TableScan,
@@ -599,7 +599,11 @@ mod tests {
};
use crate::physical_plan::ExecutionPlan;
use crate::test::*;
- use crate::{logical_plan::col, prelude::JoinType};
+ use crate::{
+ logical_plan::{col, plan::provider_as_source},
+ prelude::JoinType,
+ };
+
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
@@ -1417,7 +1421,7 @@ mod tests {
(*test_provider.schema()).clone(),
)?),
projection: None,
- source: Arc::new(test_provider),
+ source: provider_as_source(Arc::new(test_provider)),
limit: None,
});
@@ -1490,7 +1494,7 @@ mod tests {
(*test_provider.schema()).clone(),
)?),
projection: Some(vec![0]),
- source: Arc::new(test_provider),
+ source: provider_as_source(Arc::new(test_provider)),
limit: None,
});
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 98076d136..5b34a65dc 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -24,8 +24,8 @@ use super::{
};
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_plan::plan::{
- Aggregate, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, TableScan,
- Window,
+ source_as_provider, Aggregate, EmptyRelation, Filter, Join, Projection, Sort,
+ SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, Operator,
@@ -339,6 +339,7 @@ impl DefaultPhysicalPlanner {
limit,
..
}) => {
+ let source = source_as_provider(source)?;
// Remove all qualifiers from the scan as the provider
// doesn't know (nor should care) how the relation was
// referred to in the query
diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs
index cfd903991..664e77e18 100644
--- a/datafusion/core/tests/provider_filter_pushdown.rs
+++ b/datafusion/core/tests/provider_filter_pushdown.rs
@@ -19,10 +19,10 @@ use arrow::array::{as_primitive_array, Int32Builder, UInt64Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
-use datafusion::datasource::datasource::{TableProvider, TableProviderFilterPushDown};
+use datafusion::datasource::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, TaskContext};
-use datafusion::logical_plan::Expr;
+use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index e6220e96c..81ed3d85c 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -30,6 +30,7 @@ mod literal;
mod nullif;
mod operator;
mod signature;
+mod table_source;
pub mod type_coercion;
mod udaf;
mod udf;
@@ -50,6 +51,7 @@ pub use literal::{lit, lit_timestamp_nano, Literal, TimestampLiteral};
pub use nullif::SUPPORTED_NULLIF_TYPES;
pub use operator::Operator;
pub use signature::{Signature, TypeSignature, Volatility};
+pub use table_source::{TableProviderFilterPushDown, TableSource, TableType};
pub use udaf::AggregateUDF;
pub use udf::ScalarUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
diff --git a/datafusion/core/src/datasource/datasource.rs b/datafusion/expr/src/table_source.rs
similarity index 67%
copy from datafusion/core/src/datasource/datasource.rs
copy to datafusion/expr/src/table_source.rs
index 1b59c857f..8e441e484 100644
--- a/datafusion/core/src/datasource/datasource.rs
+++ b/datafusion/expr/src/table_source.rs
@@ -15,17 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-//! Data source traits
-
+use crate::Expr;
+use arrow::datatypes::SchemaRef;
use std::any::Any;
-use std::sync::Arc;
-
-use async_trait::async_trait;
-use crate::arrow::datatypes::SchemaRef;
-use crate::error::Result;
-use crate::logical_plan::Expr;
-use crate::physical_plan::ExecutionPlan;
+///! Table source
/// Indicates whether and how a filter expression can be handled by a
/// TableProvider for table scans.
@@ -55,11 +49,15 @@ pub enum TableType {
Temporary,
}
-/// Source table
-#[async_trait]
-pub trait TableProvider: Sync + Send {
- /// Returns the table provider as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
+/// The TableSource trait is used during logical query planning and optimizations and
+/// provides access to schema information and filter push-down capabilities. This trait
+/// provides a subset of the functionality of the TableProvider trait in the core
+/// datafusion crate. The TableProvider trait provides additional capabilities needed for
+/// physical query execution (such as the ability to perform a scan). The reason for
+/// having two separate traits is to avoid having the logical plan code be dependent
+/// on the DataFusion execution engine. Other projects may want to use DataFusion's
+/// logical plans and have their own execution engine.
+pub trait TableSource: Sync + Send {
fn as_any(&self) -> &dyn Any;
/// Get a reference to the schema for this table
@@ -70,27 +68,12 @@ pub trait TableProvider: Sync + Send {
TableType::Base
}
- /// Create an ExecutionPlan that will scan the table.
- /// The table provider will be usually responsible of grouping
- /// the source data into partitions that can be efficiently
- /// parallelized or distributed.
- async fn scan(
- &self,
- projection: &Option<Vec<usize>>,
- filters: &[Expr],
- // limit can be used to reduce the amount scanned
- // from the datasource as a performance optimization.
- // If set, it contains the amount of rows needed by the `LogicalPlan`,
- // The datasource should return *at least* this number of rows if available.
- limit: Option<usize>,
- ) -> Result<Arc<dyn ExecutionPlan>>;
-
/// Tests whether the table provider can make use of a filter expression
/// to optimise data retrieval.
fn supports_filter_pushdown(
&self,
_filter: &Expr,
- ) -> Result<TableProviderFilterPushDown> {
+ ) -> datafusion_common::Result<TableProviderFilterPushDown> {
Ok(TableProviderFilterPushDown::Unsupported)
}
}