You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/21 16:34:16 UTC
[arrow-datafusion] branch master updated: Move `LogicalPlan` enum to `datafusion-expr` crate (#2294)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 23f1c7756 Move `LogicalPlan` enum to `datafusion-expr` crate (#2294)
23f1c7756 is described below
commit 23f1c77569d1f3b0ff42ade56f9b2afb53d44292
Author: Andy Grove <ag...@apache.org>
AuthorDate: Thu Apr 21 10:34:09 2022 -0600
Move `LogicalPlan` enum to `datafusion-expr` crate (#2294)
---
datafusion/core/src/logical_plan/mod.rs | 10 +-
datafusion/core/src/logical_plan/plan.rs | 1233 +---------
datafusion/core/src/sql/parser.rs | 27 +-
datafusion/expr/src/lib.rs | 3 +
.../{core => expr}/src/logical_plan/display.rs | 4 +-
.../{core => expr}/src/logical_plan/extension.rs | 4 +-
datafusion/expr/src/logical_plan/mod.rs | 32 +
datafusion/{core => expr}/src/logical_plan/plan.rs | 2537 ++++++++------------
8 files changed, 1134 insertions(+), 2716 deletions(-)
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index 7ab16affd..5ce0ff911 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -23,13 +23,11 @@
pub(crate) mod builder;
mod dfschema;
-mod display;
mod expr;
mod expr_rewriter;
mod expr_schema;
mod expr_simplier;
mod expr_visitor;
-mod extension;
mod operators;
pub mod plan;
mod registry;
@@ -39,7 +37,7 @@ pub use builder::{
};
pub use datafusion_expr::expr_fn::binary_expr;
pub use dfschema::{DFField, DFSchema, DFSchemaRef, ToDFSchema};
-pub use display::display_schema;
+
pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
@@ -60,14 +58,12 @@ pub use expr_rewriter::{
pub use expr_schema::ExprSchemable;
pub use expr_simplier::{ExprSimplifiable, SimplifyInfo};
pub use expr_visitor::{ExprVisitable, ExpressionVisitor, Recursion};
-pub use extension::UserDefinedLogicalNode;
pub use operators::Operator;
pub use plan::{provider_as_source, source_as_provider};
pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
- LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, TableScan, Union,
- Values,
+ LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
+ TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
};
-pub(crate) use plan::{StringifiedPlan, ToStringifiedPlan};
pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index a26d47358..89047af44 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -17,114 +17,24 @@
//! This module contains the `LogicalPlan` enum that describes queries
//! via a logical query plan.
-use super::display::{GraphvizVisitor, IndentVisitor};
-use super::expr::{Column, Expr};
-use super::extension::UserDefinedLogicalNode;
+use super::expr::Expr;
+use crate::arrow::datatypes::SchemaRef;
use crate::datasource::TableProvider;
use crate::error::DataFusionError;
-use crate::logical_expr::TableProviderFilterPushDown;
-use crate::logical_plan::dfschema::DFSchemaRef;
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_expr::TableSource;
-use std::any::Any;
-use std::fmt::Formatter;
-use std::{
- collections::HashSet,
- fmt::{self, Display},
- sync::Arc,
+pub use crate::logical_expr::{
+ logical_plan::{
+ display::{GraphvizVisitor, IndentVisitor},
+ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
+ CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension,
+ FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
+ Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
+ StringifiedPlan, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+ UserDefinedLogicalNode, Values, Window,
+ },
+ TableProviderFilterPushDown, TableSource,
};
-
-/// Join type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum JoinType {
- /// Inner Join
- Inner,
- /// Left Join
- Left,
- /// Right Join
- Right,
- /// Full Join
- Full,
- /// Semi Join
- Semi,
- /// Anti Join
- Anti,
-}
-
-impl Display for JoinType {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- let join_type = match self {
- JoinType::Inner => "Inner",
- JoinType::Left => "Left",
- JoinType::Right => "Right",
- JoinType::Full => "Full",
- JoinType::Semi => "Semi",
- JoinType::Anti => "Anti",
- };
- write!(f, "{}", join_type)
- }
-}
-
-/// Join constraint
-#[derive(Debug, Clone, Copy)]
-pub enum JoinConstraint {
- /// Join ON
- On,
- /// Join USING
- Using,
-}
-
-/// Evaluates an arbitrary list of expressions (essentially a
-/// SELECT with an expression list) on its input.
-#[derive(Clone)]
-pub struct Projection {
- /// The list of expressions
- pub expr: Vec<Expr>,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The schema description of the output
- pub schema: DFSchemaRef,
- /// Projection output relation alias
- pub alias: Option<String>,
-}
-
-/// Aliased subquery
-#[derive(Clone)]
-pub struct SubqueryAlias {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The alias for the input relation
- pub alias: String,
- /// The schema with qualified field names
- pub schema: DFSchemaRef,
-}
-
-/// Filters rows from its input that do not match an
-/// expression (essentially a WHERE clause with a predicate
-/// expression).
-///
-/// Semantically, `<predicate>` is evaluated for each row of the input;
-/// If the value of `<predicate>` is true, the input row is passed to
-/// the output. If the value of `<predicate>` is false, the row is
-/// discarded.
-#[derive(Clone)]
-pub struct Filter {
- /// The predicate expression, which must have Boolean type.
- pub predicate: Expr,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
-}
-
-/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
-#[derive(Clone)]
-pub struct Window {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The window function expression
- pub window_expr: Vec<Expr>,
- /// The schema description of the window output
- pub schema: DFSchemaRef,
-}
+use std::any::Any;
+use std::sync::Arc;
/// DataFusion default table source, wrapping TableProvider
///
@@ -188,1122 +98,11 @@ pub fn source_as_provider(
}
}
-/// Produces rows from a table provider by reference or from the context
-#[derive(Clone)]
-pub struct TableScan {
- /// The name of the table
- pub table_name: String,
- /// The source of the table
- pub source: Arc<dyn TableSource>,
- /// Optional column indices to use as a projection
- pub projection: Option<Vec<usize>>,
- /// The schema description of the output
- pub projected_schema: DFSchemaRef,
- /// Optional expressions to be used as filters by the table provider
- pub filters: Vec<Expr>,
- /// Optional limit to skip reading
- pub limit: Option<usize>,
-}
-
-/// Apply Cross Join to two logical plans
-#[derive(Clone)]
-pub struct CrossJoin {
- /// Left input
- pub left: Arc<LogicalPlan>,
- /// Right input
- pub right: Arc<LogicalPlan>,
- /// The output schema, containing fields from the left and right inputs
- pub schema: DFSchemaRef,
-}
-
-/// Repartition the plan based on a partitioning scheme.
-#[derive(Clone)]
-pub struct Repartition {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The partitioning scheme
- pub partitioning_scheme: Partitioning,
-}
-
-/// Union multiple inputs
-#[derive(Clone)]
-pub struct Union {
- /// Inputs to merge
- pub inputs: Vec<LogicalPlan>,
- /// Union schema. Should be the same for all inputs.
- pub schema: DFSchemaRef,
- /// Union output relation alias
- pub alias: Option<String>,
-}
-
-/// Creates an in memory table.
-#[derive(Clone)]
-pub struct CreateMemoryTable {
- /// The table name
- pub name: String,
- /// The logical plan
- pub input: Arc<LogicalPlan>,
- /// Option to not error if table already exists
- pub if_not_exists: bool,
-}
-
-/// Types of files to parse as DataFrames
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum FileType {
- /// Newline-delimited JSON
- NdJson,
- /// Apache Parquet columnar storage
- Parquet,
- /// Comma separated values
- CSV,
- /// Avro binary records
- Avro,
-}
-
-/// Creates an external table.
-#[derive(Clone)]
-pub struct CreateExternalTable {
- /// The table schema
- pub schema: DFSchemaRef,
- /// The table name
- pub name: String,
- /// The physical location
- pub location: String,
- /// The file type of physical file
- pub file_type: FileType,
- /// Whether the CSV file contains a header
- pub has_header: bool,
- /// Delimiter for CSV
- pub delimiter: char,
- /// Partition Columns
- pub table_partition_cols: Vec<String>,
- /// Option to not error if table already exists
- pub if_not_exists: bool,
-}
-
-/// Creates a schema.
-#[derive(Clone)]
-pub struct CreateCatalogSchema {
- /// The table schema
- pub schema_name: String,
- /// Do nothing (except issuing a notice) if a schema with the same name already exists
- pub if_not_exists: bool,
- /// Empty schema
- pub schema: DFSchemaRef,
-}
-
-/// Creates a catalog (aka "Database").
-#[derive(Clone)]
-pub struct CreateCatalog {
- /// The catalog name
- pub catalog_name: String,
- /// Do nothing (except issuing a notice) if a schema with the same name already exists
- pub if_not_exists: bool,
- /// Empty schema
- pub schema: DFSchemaRef,
-}
-
-/// Drops a table.
-#[derive(Clone)]
-pub struct DropTable {
- /// The table name
- pub name: String,
- /// If the table exists
- pub if_exists: bool,
- /// Dummy schema
- pub schema: DFSchemaRef,
-}
-
-/// Produces a relation with string representations of
-/// various parts of the plan
-#[derive(Clone)]
-pub struct Explain {
- /// Should extra (detailed, intermediate plans) be included?
- pub verbose: bool,
- /// The logical plan that is being EXPLAIN'd
- pub plan: Arc<LogicalPlan>,
- /// Represent the various stages plans have gone through
- pub stringified_plans: Vec<StringifiedPlan>,
- /// The output schema of the explain (2 columns of text)
- pub schema: DFSchemaRef,
-}
-
-/// Runs the actual plan, and then prints the physical plan with
-/// with execution metrics.
-#[derive(Clone)]
-pub struct Analyze {
- /// Should extra detail be included?
- pub verbose: bool,
- /// The logical plan that is being EXPLAIN ANALYZE'd
- pub input: Arc<LogicalPlan>,
- /// The output schema of the explain (2 columns of text)
- pub schema: DFSchemaRef,
-}
-
-/// Extension operator defined outside of DataFusion
-#[derive(Clone)]
-pub struct Extension {
- /// The runtime extension operator
- pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
-}
-
-/// Produces no rows: An empty relation with an empty schema
-#[derive(Clone)]
-pub struct EmptyRelation {
- /// Whether to produce a placeholder row
- pub produce_one_row: bool,
- /// The schema description of the output
- pub schema: DFSchemaRef,
-}
-
-/// Produces the first `n` tuples from its input and discards the rest.
-#[derive(Clone)]
-pub struct Limit {
- /// The limit
- pub n: usize,
- /// The logical plan
- pub input: Arc<LogicalPlan>,
-}
-
-/// Values expression. See
-/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
-/// documentation for more details.
-#[derive(Clone)]
-pub struct Values {
- /// The table schema
- pub schema: DFSchemaRef,
- /// Values
- pub values: Vec<Vec<Expr>>,
-}
-
-/// Aggregates its input based on a set of grouping and aggregate
-/// expressions (e.g. SUM).
-#[derive(Clone)]
-pub struct Aggregate {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// Grouping expressions
- pub group_expr: Vec<Expr>,
- /// Aggregate expressions
- pub aggr_expr: Vec<Expr>,
- /// The schema description of the aggregate output
- pub schema: DFSchemaRef,
-}
-
-/// Sorts its input according to a list of sort expressions.
-#[derive(Clone)]
-pub struct Sort {
- /// The sort expressions
- pub expr: Vec<Expr>,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
-}
-
-/// Join two logical plans on one or more join columns
-#[derive(Clone)]
-pub struct Join {
- /// Left input
- pub left: Arc<LogicalPlan>,
- /// Right input
- pub right: Arc<LogicalPlan>,
- /// Equijoin clause expressed as pairs of (left, right) join columns
- pub on: Vec<(Column, Column)>,
- /// Join type
- pub join_type: JoinType,
- /// Join constraint
- pub join_constraint: JoinConstraint,
- /// The output schema, containing fields from the left and right inputs
- pub schema: DFSchemaRef,
- /// If null_equals_null is true, null == null else null != null
- pub null_equals_null: bool,
-}
-
-/// A LogicalPlan represents the different types of relational
-/// operators (such as Projection, Filter, etc) and can be created by
-/// the SQL query planner and the DataFrame API.
-///
-/// A LogicalPlan represents transforming an input relation (table) to
-/// an output relation (table) with a (potentially) different
-/// schema. A plan represents a dataflow tree where data flows
-/// from leaves up to the root to produce the query result.
-#[derive(Clone)]
-pub enum LogicalPlan {
- /// Evaluates an arbitrary list of expressions (essentially a
- /// SELECT with an expression list) on its input.
- Projection(Projection),
- /// Filters rows from its input that do not match an
- /// expression (essentially a WHERE clause with a predicate
- /// expression).
- ///
- /// Semantically, `<predicate>` is evaluated for each row of the input;
- /// If the value of `<predicate>` is true, the input row is passed to
- /// the output. If the value of `<predicate>` is false, the row is
- /// discarded.
- Filter(Filter),
- /// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
- Window(Window),
- /// Aggregates its input based on a set of grouping and aggregate
- /// expressions (e.g. SUM).
- Aggregate(Aggregate),
- /// Sorts its input according to a list of sort expressions.
- Sort(Sort),
- /// Join two logical plans on one or more join columns
- Join(Join),
- /// Apply Cross Join to two logical plans
- CrossJoin(CrossJoin),
- /// Repartition the plan based on a partitioning scheme.
- Repartition(Repartition),
- /// Union multiple inputs
- Union(Union),
- /// Produces rows from a table provider by reference or from the context
- TableScan(TableScan),
- /// Produces no rows: An empty relation with an empty schema
- EmptyRelation(EmptyRelation),
- /// Aliased relation provides, or changes, the name of a relation.
- SubqueryAlias(SubqueryAlias),
- /// Produces the first `n` tuples from its input and discards the rest.
- Limit(Limit),
- /// Creates an external table.
- CreateExternalTable(CreateExternalTable),
- /// Creates an in memory table.
- CreateMemoryTable(CreateMemoryTable),
- /// Creates a new catalog schema.
- CreateCatalogSchema(CreateCatalogSchema),
- /// Creates a new catalog (aka "Database").
- CreateCatalog(CreateCatalog),
- /// Drops a table.
- DropTable(DropTable),
- /// Values expression. See
- /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
- /// documentation for more details.
- Values(Values),
- /// Produces a relation with string representations of
- /// various parts of the plan
- Explain(Explain),
- /// Runs the actual plan, and then prints the physical plan with
- /// with execution metrics.
- Analyze(Analyze),
- /// Extension operator defined outside of DataFusion
- Extension(Extension),
-}
-
-impl LogicalPlan {
- /// Get a reference to the logical plan's schema
- pub fn schema(&self) -> &DFSchemaRef {
- match self {
- LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
- LogicalPlan::Values(Values { schema, .. }) => schema,
- LogicalPlan::TableScan(TableScan {
- projected_schema, ..
- }) => projected_schema,
- LogicalPlan::Projection(Projection { schema, .. }) => schema,
- LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
- LogicalPlan::Window(Window { schema, .. }) => schema,
- LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
- LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
- LogicalPlan::Join(Join { schema, .. }) => schema,
- LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
- LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
- LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
- LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
- LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
- schema
- }
- LogicalPlan::Explain(explain) => &explain.schema,
- LogicalPlan::Analyze(analyze) => &analyze.schema,
- LogicalPlan::Extension(extension) => extension.node.schema(),
- LogicalPlan::Union(Union { schema, .. }) => schema,
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- input.schema()
- }
- LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
- schema
- }
- LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
- LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
- }
- }
-
- /// Get a vector of references to all schemas in every node of the logical plan
- pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
- match self {
- LogicalPlan::TableScan(TableScan {
- projected_schema, ..
- }) => vec![projected_schema],
- LogicalPlan::Values(Values { schema, .. }) => vec![schema],
- LogicalPlan::Window(Window { input, schema, .. })
- | LogicalPlan::Projection(Projection { input, schema, .. })
- | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
- let mut schemas = input.all_schemas();
- schemas.insert(0, schema);
- schemas
- }
- LogicalPlan::Join(Join {
- left,
- right,
- schema,
- ..
- })
- | LogicalPlan::CrossJoin(CrossJoin {
- left,
- right,
- schema,
- }) => {
- let mut schemas = left.all_schemas();
- schemas.extend(right.all_schemas());
- schemas.insert(0, schema);
- schemas
- }
- LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Union(Union { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Extension(extension) => vec![extension.node.schema()],
- LogicalPlan::Explain(Explain { schema, .. })
- | LogicalPlan::Analyze(Analyze { schema, .. })
- | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
- | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
- | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
- | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Limit(Limit { input, .. })
- | LogicalPlan::Repartition(Repartition { input, .. })
- | LogicalPlan::Sort(Sort { input, .. })
- | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
- | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
- LogicalPlan::DropTable(_) => vec![],
- }
- }
-
- /// Returns the (fixed) output schema for explain plans
- pub fn explain_schema() -> SchemaRef {
- SchemaRef::new(Schema::new(vec![
- Field::new("plan_type", DataType::Utf8, false),
- Field::new("plan", DataType::Utf8, false),
- ]))
- }
-
- /// returns all expressions (non-recursively) in the current
- /// logical plan node. This does not include expressions in any
- /// children
- pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
- match self {
- LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
- LogicalPlan::Values(Values { values, .. }) => {
- values.iter().flatten().cloned().collect()
- }
- LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::Hash(expr, _) => expr.clone(),
- _ => vec![],
- },
- LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
- LogicalPlan::Aggregate(Aggregate {
- group_expr,
- aggr_expr,
- ..
- }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
- LogicalPlan::Join(Join { on, .. }) => on
- .iter()
- .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
- .collect(),
- LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
- LogicalPlan::Extension(extension) => extension.node.expressions(),
- // plans without expressions
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation(_)
- | LogicalPlan::SubqueryAlias(_)
- | LogicalPlan::Limit(_)
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateMemoryTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_)
- | LogicalPlan::CrossJoin(_)
- | LogicalPlan::Analyze { .. }
- | LogicalPlan::Explain { .. }
- | LogicalPlan::Union(_) => {
- vec![]
- }
- }
- }
-
- /// returns all inputs of this `LogicalPlan` node. Does not
- /// include inputs to inputs.
- pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
- match self {
- LogicalPlan::Projection(Projection { input, .. }) => vec![input],
- LogicalPlan::Filter(Filter { input, .. }) => vec![input],
- LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
- LogicalPlan::Window(Window { input, .. }) => vec![input],
- LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
- LogicalPlan::Sort(Sort { input, .. }) => vec![input],
- LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
- LogicalPlan::Limit(Limit { input, .. }) => vec![input],
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
- LogicalPlan::Extension(extension) => extension.node.inputs(),
- LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
- LogicalPlan::Explain(explain) => vec![&explain.plan],
- LogicalPlan::Analyze(analyze) => vec![&analyze.input],
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- vec![input]
- }
- // plans without inputs
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation { .. }
- | LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_) => vec![],
- }
- }
-
- /// returns all `Using` join columns in a logical plan
- pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
- struct UsingJoinColumnVisitor {
- using_columns: Vec<HashSet<Column>>,
- }
-
- impl PlanVisitor for UsingJoinColumnVisitor {
- type Error = DataFusionError;
-
- fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
- if let LogicalPlan::Join(Join {
- join_constraint: JoinConstraint::Using,
- on,
- ..
- }) = plan
- {
- self.using_columns.push(
- on.iter()
- .flat_map(|entry| [&entry.0, &entry.1])
- .cloned()
- .collect::<HashSet<Column>>(),
- );
- }
- Ok(true)
- }
- }
-
- let mut visitor = UsingJoinColumnVisitor {
- using_columns: vec![],
- };
- self.accept(&mut visitor)?;
- Ok(visitor.using_columns)
- }
-}
-
-/// Logical partitioning schemes supported by the repartition operator.
-#[derive(Debug, Clone)]
-pub enum Partitioning {
- /// Allocate batches using a round-robin algorithm and the specified number of partitions
- RoundRobinBatch(usize),
- /// Allocate rows based on a hash of one of more expressions and the specified number
- /// of partitions.
- /// This partitioning scheme is not yet fully supported. See <https://issues.apache.org/jira/browse/ARROW-11011>
- Hash(Vec<Expr>, usize),
-}
-
-/// Trait that implements the [Visitor
-/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
-/// depth first walk of `LogicalPlan` nodes. `pre_visit` is called
-/// before any children are visited, and then `post_visit` is called
-/// after all children have been visited.
-////
-/// To use, define a struct that implements this trait and then invoke
-/// [`LogicalPlan::accept`].
-///
-/// For example, for a logical plan like:
-///
-/// ```text
-/// Projection: #id
-/// Filter: #state Eq Utf8(\"CO\")\
-/// CsvScan: employee.csv projection=Some([0, 3])";
-/// ```
-///
-/// The sequence of visit operations would be:
-/// ```text
-/// visitor.pre_visit(Projection)
-/// visitor.pre_visit(Filter)
-/// visitor.pre_visit(CsvScan)
-/// visitor.post_visit(CsvScan)
-/// visitor.post_visit(Filter)
-/// visitor.post_visit(Projection)
-/// ```
-pub trait PlanVisitor {
- /// The type of error returned by this visitor
- type Error;
-
- /// Invoked on a logical plan before any of its child inputs have been
- /// visited. If Ok(true) is returned, the recursion continues. If
- /// Err(..) or Ok(false) are returned, the recursion stops
- /// immediately and the error, if any, is returned to `accept`
- fn pre_visit(&mut self, plan: &LogicalPlan)
- -> std::result::Result<bool, Self::Error>;
-
- /// Invoked on a logical plan after all of its child inputs have
- /// been visited. The return value is handled the same as the
- /// return value of `pre_visit`. The provided default implementation
- /// returns `Ok(true)`.
- fn post_visit(
- &mut self,
- _plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- Ok(true)
- }
-}
-
-impl LogicalPlan {
- /// returns all inputs in the logical plan. Returns Ok(true) if
- /// all nodes were visited, and Ok(false) if any call to
- /// `pre_visit` or `post_visit` returned Ok(false) and may have
- /// cut short the recursion
- pub fn accept<V>(&self, visitor: &mut V) -> std::result::Result<bool, V::Error>
- where
- V: PlanVisitor,
- {
- if !visitor.pre_visit(self)? {
- return Ok(false);
- }
-
- let recurse = match self {
- LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Repartition(Repartition { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Aggregate(Aggregate { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Join(Join { left, right, .. })
- | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- left.accept(visitor)? && right.accept(visitor)?
- }
- LogicalPlan::Union(Union { inputs, .. }) => {
- for input in inputs {
- if !input.accept(visitor)? {
- return Ok(false);
- }
- }
- true
- }
- LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::Extension(extension) => {
- for input in extension.node.inputs() {
- if !input.accept(visitor)? {
- return Ok(false);
- }
- }
- true
- }
- LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
- LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
- // plans without inputs
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation(_)
- | LogicalPlan::Values(_)
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_) => true,
- };
- if !recurse {
- return Ok(false);
- }
-
- if !visitor.post_visit(self)? {
- return Ok(false);
- }
-
- Ok(true)
- }
-}
-
-// Various implementations for printing out LogicalPlans
-impl LogicalPlan {
- /// Return a `format`able structure that produces a single line
- /// per node. For example:
- ///
- /// ```text
- /// Projection: #employee.id
- /// Filter: #employee.state Eq Utf8(\"CO\")\
- /// CsvScan: employee projection=Some([0, 3])
- /// ```
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_indent
- /// let display_string = format!("{}", plan.display_indent());
- ///
- /// assert_eq!("Filter: #foo_csv.id = Int32(5)\
- /// \n TableScan: foo_csv projection=None",
- /// display_string);
- /// ```
- pub fn display_indent(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let with_schema = false;
- let mut visitor = IndentVisitor::new(f, with_schema);
- self.0.accept(&mut visitor).unwrap();
- Ok(())
- }
- }
- Wrapper(self)
- }
-
- /// Return a `format`able structure that produces a single line
- /// per node that includes the output schema. For example:
- ///
- /// ```text
- /// Projection: #employee.id [id:Int32]\
- /// Filter: #employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
- /// TableScan: employee projection=Some([0, 3]) [id:Int32, state:Utf8]";
- /// ```
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_indent_schema
- /// let display_string = format!("{}", plan.display_indent_schema());
- ///
- /// assert_eq!("Filter: #foo_csv.id = Int32(5) [id:Int32]\
- /// \n TableScan: foo_csv projection=None [id:Int32]",
- /// display_string);
- /// ```
- pub fn display_indent_schema(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let with_schema = true;
- let mut visitor = IndentVisitor::new(f, with_schema);
- self.0.accept(&mut visitor).unwrap();
- Ok(())
- }
- }
- Wrapper(self)
- }
-
- /// Return a `format`able structure that produces lines meant for
- /// graphical display using the `DOT` language. This format can be
- /// visualized using software from
- /// [`graphviz`](https://graphviz.org/)
- ///
- /// This currently produces two graphs -- one with the basic
- /// structure, and one with additional details such as schema.
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_graphviz
- /// let graphviz_string = format!("{}", plan.display_graphviz());
- /// ```
- ///
- /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
- /// commands can be used to render it as a pdf:
- ///
- /// ```bash
- /// dot -Tpdf < /tmp/example.dot > /tmp/example.pdf
- /// ```
- ///
- pub fn display_graphviz(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- writeln!(
- f,
- "// Begin DataFusion GraphViz Plan (see https://graphviz.org)"
- )?;
- writeln!(f, "digraph {{")?;
-
- let mut visitor = GraphvizVisitor::new(f);
-
- visitor.pre_visit_plan("LogicalPlan")?;
- self.0.accept(&mut visitor).unwrap();
- visitor.post_visit_plan()?;
-
- visitor.set_with_schema(true);
- visitor.pre_visit_plan("Detailed LogicalPlan")?;
- self.0.accept(&mut visitor).unwrap();
- visitor.post_visit_plan()?;
-
- writeln!(f, "}}")?;
- writeln!(f, "// End DataFusion GraphViz Plan")?;
- Ok(())
- }
- }
- Wrapper(self)
- }
-
- /// Return a `format`able structure with the a human readable
- /// description of this LogicalPlan node per node, not including
- /// children. For example:
- ///
- /// ```text
- /// Projection: #id
- /// ```
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display
- /// let display_string = format!("{}", plan.display());
- ///
- /// assert_eq!("TableScan: foo.csv projection=None", display_string);
- /// ```
- pub fn display(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match &*self.0 {
- LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
- LogicalPlan::Values(Values { ref values, .. }) => {
- let str_values: Vec<_> = values
- .iter()
- // limit to only 5 values to avoid horrible display
- .take(5)
- .map(|row| {
- let item = row
- .iter()
- .map(|expr| expr.to_string())
- .collect::<Vec<_>>()
- .join(", ");
- format!("({})", item)
- })
- .collect();
-
- let elipse = if values.len() > 5 { "..." } else { "" };
- write!(f, "Values: {}{}", str_values.join(", "), elipse)
- }
-
- LogicalPlan::TableScan(TableScan {
- ref source,
- ref table_name,
- ref projection,
- ref filters,
- ref limit,
- ..
- }) => {
- write!(
- f,
- "TableScan: {} projection={:?}",
- table_name, projection
- )?;
-
- if !filters.is_empty() {
- let mut full_filter = vec![];
- let mut partial_filter = vec![];
- let mut unsupported_filters = vec![];
-
- filters.iter().for_each(|x| {
- if let Ok(t) = source.supports_filter_pushdown(x) {
- match t {
- TableProviderFilterPushDown::Exact => {
- full_filter.push(x)
- }
- TableProviderFilterPushDown::Inexact => {
- partial_filter.push(x)
- }
- TableProviderFilterPushDown::Unsupported => {
- unsupported_filters.push(x)
- }
- }
- }
- });
-
- if !full_filter.is_empty() {
- write!(f, ", full_filters={:?}", full_filter)?;
- };
- if !partial_filter.is_empty() {
- write!(f, ", partial_filters={:?}", partial_filter)?;
- }
- if !unsupported_filters.is_empty() {
- write!(
- f,
- ", unsupported_filters={:?}",
- unsupported_filters
- )?;
- }
- }
-
- if let Some(n) = limit {
- write!(f, ", limit={}", n)?;
- }
-
- Ok(())
- }
- LogicalPlan::Projection(Projection {
- ref expr, alias, ..
- }) => {
- write!(f, "Projection: ")?;
- for (i, expr_item) in expr.iter().enumerate() {
- if i > 0 {
- write!(f, ", ")?;
- }
- write!(f, "{:?}", expr_item)?;
- }
- if let Some(a) = alias {
- write!(f, ", alias={}", a)?;
- }
- Ok(())
- }
- LogicalPlan::Filter(Filter {
- predicate: ref expr,
- ..
- }) => write!(f, "Filter: {:?}", expr),
- LogicalPlan::Window(Window {
- ref window_expr, ..
- }) => {
- write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
- }
- LogicalPlan::Aggregate(Aggregate {
- ref group_expr,
- ref aggr_expr,
- ..
- }) => write!(
- f,
- "Aggregate: groupBy=[{:?}], aggr=[{:?}]",
- group_expr, aggr_expr
- ),
- LogicalPlan::Sort(Sort { expr, .. }) => {
- write!(f, "Sort: ")?;
- for (i, expr_item) in expr.iter().enumerate() {
- if i > 0 {
- write!(f, ", ")?;
- }
- write!(f, "{:?}", expr_item)?;
- }
- Ok(())
- }
- LogicalPlan::Join(Join {
- on: ref keys,
- join_constraint,
- join_type,
- ..
- }) => {
- let join_expr: Vec<String> =
- keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
- match join_constraint {
- JoinConstraint::On => {
- write!(f, "{} Join: {}", join_type, join_expr.join(", "))
- }
- JoinConstraint::Using => {
- write!(
- f,
- "{} Join: Using {}",
- join_type,
- join_expr.join(", ")
- )
- }
- }
- }
- LogicalPlan::CrossJoin(_) => {
- write!(f, "CrossJoin:")
- }
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::RoundRobinBatch(n) => write!(
- f,
- "Repartition: RoundRobinBatch partition_count={}",
- n
- ),
- Partitioning::Hash(expr, n) => {
- let hash_expr: Vec<String> =
- expr.iter().map(|e| format!("{:?}", e)).collect();
- write!(
- f,
- "Repartition: Hash({}) partition_count={}",
- hash_expr.join(", "),
- n
- )
- }
- },
- LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
- LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
- write!(f, "SubqueryAlias: {}", alias)
- }
- LogicalPlan::CreateExternalTable(CreateExternalTable {
- ref name,
- ..
- }) => {
- write!(f, "CreateExternalTable: {:?}", name)
- }
- LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name, ..
- }) => {
- write!(f, "CreateMemoryTable: {:?}", name)
- }
- LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
- schema_name,
- ..
- }) => {
- write!(f, "CreateCatalogSchema: {:?}", schema_name)
- }
- LogicalPlan::CreateCatalog(CreateCatalog {
- catalog_name, ..
- }) => {
- write!(f, "CreateCatalog: {:?}", catalog_name)
- }
- LogicalPlan::DropTable(DropTable {
- name, if_exists, ..
- }) => {
- write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
- }
- LogicalPlan::Explain { .. } => write!(f, "Explain"),
- LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
- LogicalPlan::Union(_) => write!(f, "Union"),
- LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
- }
- }
- }
- Wrapper(self)
- }
-}
-
-impl fmt::Debug for LogicalPlan {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- self.display_indent().fmt(f)
- }
-}
-
-/// Represents which type of plan, when storing multiple
-/// for use in EXPLAIN plans
-#[derive(Debug, Clone, PartialEq)]
-pub enum PlanType {
- /// The initial LogicalPlan provided to DataFusion
- InitialLogicalPlan,
- /// The LogicalPlan which results from applying an optimizer pass
- OptimizedLogicalPlan {
- /// The name of the optimizer which produced this plan
- optimizer_name: String,
- },
- /// The final, fully optimized LogicalPlan that was converted to a physical plan
- FinalLogicalPlan,
- /// The initial physical plan, prepared for execution
- InitialPhysicalPlan,
- /// The ExecutionPlan which results from applying an optimizer pass
- OptimizedPhysicalPlan {
- /// The name of the optimizer which produced this plan
- optimizer_name: String,
- },
- /// The final, fully optimized physical which would be executed
- FinalPhysicalPlan,
-}
-
-impl fmt::Display for PlanType {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match self {
- PlanType::InitialLogicalPlan => write!(f, "initial_logical_plan"),
- PlanType::OptimizedLogicalPlan { optimizer_name } => {
- write!(f, "logical_plan after {}", optimizer_name)
- }
- PlanType::FinalLogicalPlan => write!(f, "logical_plan"),
- PlanType::InitialPhysicalPlan => write!(f, "initial_physical_plan"),
- PlanType::OptimizedPhysicalPlan { optimizer_name } => {
- write!(f, "physical_plan after {}", optimizer_name)
- }
- PlanType::FinalPhysicalPlan => write!(f, "physical_plan"),
- }
- }
-}
-
-/// Represents some sort of execution plan, in String form
-#[derive(Debug, Clone, PartialEq)]
-#[allow(clippy::rc_buffer)]
-pub struct StringifiedPlan {
- /// An identifier of what type of plan this string represents
- pub plan_type: PlanType,
- /// The string representation of the plan
- pub plan: Arc<String>,
-}
-
-impl StringifiedPlan {
- /// Create a new Stringified plan of `plan_type` with string
- /// representation `plan`
- pub fn new(plan_type: PlanType, plan: impl Into<String>) -> Self {
- StringifiedPlan {
- plan_type,
- plan: Arc::new(plan.into()),
- }
- }
-
- /// returns true if this plan should be displayed. Generally
- /// `verbose_mode = true` will display all available plans
- pub fn should_display(&self, verbose_mode: bool) -> bool {
- match self.plan_type {
- PlanType::FinalLogicalPlan | PlanType::FinalPhysicalPlan => true,
- _ => verbose_mode,
- }
- }
-}
-
-/// Trait for something that can be formatted as a stringified plan
-pub trait ToStringifiedPlan {
- /// Create a stringified plan with the specified type
- fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan;
-}
-
-impl ToStringifiedPlan for LogicalPlan {
- fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
- StringifiedPlan::new(plan_type, self.display_indent().to_string())
- }
-}
-
#[cfg(test)]
mod tests {
use super::super::{col, lit, LogicalPlanBuilder};
use super::*;
+ use arrow::datatypes::{DataType, Field, Schema};
fn employee_schema() -> Schema {
Schema::new(vec![
diff --git a/datafusion/core/src/sql/parser.rs b/datafusion/core/src/sql/parser.rs
index 0f4596fe6..b0c44127e 100644
--- a/datafusion/core/src/sql/parser.rs
+++ b/datafusion/core/src/sql/parser.rs
@@ -27,7 +27,6 @@ use sqlparser::{
tokenizer::{Token, Tokenizer},
};
use std::collections::VecDeque;
-use std::str::FromStr;
// Use `Parser::expected` instead, if possible
macro_rules! parser_err {
@@ -36,20 +35,16 @@ macro_rules! parser_err {
};
}
-impl FromStr for FileType {
- type Err = ParserError;
-
- fn from_str(s: &str) -> Result<Self, Self::Err> {
- match s.to_uppercase().as_str() {
- "PARQUET" => Ok(Self::Parquet),
- "NDJSON" => Ok(Self::NdJson),
- "CSV" => Ok(Self::CSV),
- "AVRO" => Ok(Self::Avro),
- other => Err(ParserError::ParserError(format!(
- "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}",
- other
- ))),
- }
+fn parse_file_type(s: &str) -> Result<FileType, ParserError> {
+ match s.to_uppercase().as_str() {
+ "PARQUET" => Ok(FileType::Parquet),
+ "NDJSON" => Ok(FileType::NdJson),
+ "CSV" => Ok(FileType::CSV),
+ "AVRO" => Ok(FileType::Avro),
+ other => Err(ParserError::ParserError(format!(
+ "expect one of PARQUET, AVRO, NDJSON, or CSV, found: {}",
+ other
+ ))),
}
}
@@ -334,7 +329,7 @@ impl<'a> DFParser<'a> {
/// Parses the set of valid formats
fn parse_file_format(&mut self) -> Result<FileType, ParserError> {
match self.parser.next_token() {
- Token::Word(w) => w.value.parse(),
+ Token::Word(w) => parse_file_type(&w.value),
unexpected => self.expected("one of PARQUET, NDJSON, or CSV", unexpected),
}
}
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index 81ed3d85c..7586dbf60 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -27,6 +27,7 @@ pub mod expr_fn;
pub mod field_util;
pub mod function;
mod literal;
+pub mod logical_plan;
mod nullif;
mod operator;
mod signature;
@@ -56,3 +57,5 @@ pub use udaf::AggregateUDF;
pub use udf::ScalarUDF;
pub use window_frame::{WindowFrame, WindowFrameBound, WindowFrameUnits};
pub use window_function::{BuiltInWindowFunction, WindowFunction};
+
+pub use logical_plan::{LogicalPlan, PlanVisitor};
diff --git a/datafusion/core/src/logical_plan/display.rs b/datafusion/expr/src/logical_plan/display.rs
similarity index 98%
rename from datafusion/core/src/logical_plan/display.rs
rename to datafusion/expr/src/logical_plan/display.rs
index 8178ef448..b24a689d6 100644
--- a/datafusion/core/src/logical_plan/display.rs
+++ b/datafusion/expr/src/logical_plan/display.rs
@@ -16,7 +16,7 @@
// under the License.
//! This module provides logic for displaying LogicalPlans in various styles
-use super::{LogicalPlan, PlanVisitor};
+use crate::{LogicalPlan, PlanVisitor};
use arrow::datatypes::Schema;
use std::fmt;
@@ -82,7 +82,7 @@ impl<'a, 'b> PlanVisitor for IndentVisitor<'a, 'b> {
///
/// ```
/// use arrow::datatypes::{Field, Schema, DataType};
-/// # use datafusion::logical_plan::display_schema;
+/// # use datafusion_expr::logical_plan::display_schema;
/// let schema = Schema::new(vec![
/// Field::new("id", DataType::Int32, false),
/// Field::new("first_name", DataType::Utf8, true),
diff --git a/datafusion/core/src/logical_plan/extension.rs b/datafusion/expr/src/logical_plan/extension.rs
similarity index 97%
rename from datafusion/core/src/logical_plan/extension.rs
rename to datafusion/expr/src/logical_plan/extension.rs
index ee19ad43e..73ca8b9f2 100644
--- a/datafusion/core/src/logical_plan/extension.rs
+++ b/datafusion/expr/src/logical_plan/extension.rs
@@ -16,8 +16,8 @@
// under the License.
//! This module defines the interface for logical nodes
-use super::{Expr, LogicalPlan};
-use crate::logical_plan::DFSchemaRef;
+use crate::{Expr, LogicalPlan};
+use datafusion_common::DFSchemaRef;
use std::{any::Any, collections::HashSet, fmt, sync::Arc};
/// This defines the interface for `LogicalPlan` nodes that can be
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
new file mode 100644
index 000000000..525740274
--- /dev/null
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod display;
+mod extension;
+mod plan;
+
+pub use plan::{
+ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
+ CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType,
+ Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
+ PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, SubqueryAlias,
+ TableScan, ToStringifiedPlan, Union, Values, Window,
+};
+
+pub use display::display_schema;
+
+pub use extension::UserDefinedLogicalNode;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
similarity index 78%
copy from datafusion/core/src/logical_plan/plan.rs
copy to datafusion/expr/src/logical_plan/plan.rs
index a26d47358..8fefac67d 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -14,1207 +14,1142 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-//! This module contains the `LogicalPlan` enum that describes queries
-//! via a logical query plan.
-
-use super::display::{GraphvizVisitor, IndentVisitor};
-use super::expr::{Column, Expr};
-use super::extension::UserDefinedLogicalNode;
-use crate::datasource::TableProvider;
-use crate::error::DataFusionError;
-use crate::logical_expr::TableProviderFilterPushDown;
-use crate::logical_plan::dfschema::DFSchemaRef;
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
-use datafusion_expr::TableSource;
-use std::any::Any;
-use std::fmt::Formatter;
-use std::{
- collections::HashSet,
- fmt::{self, Display},
- sync::Arc,
-};
-
-/// Join type
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum JoinType {
- /// Inner Join
- Inner,
- /// Left Join
- Left,
- /// Right Join
- Right,
- /// Full Join
- Full,
- /// Semi Join
- Semi,
- /// Anti Join
- Anti,
-}
-
-impl Display for JoinType {
- fn fmt(&self, f: &mut Formatter) -> fmt::Result {
- let join_type = match self {
- JoinType::Inner => "Inner",
- JoinType::Left => "Left",
- JoinType::Right => "Right",
- JoinType::Full => "Full",
- JoinType::Semi => "Semi",
- JoinType::Anti => "Anti",
- };
- write!(f, "{}", join_type)
- }
-}
-/// Join constraint
-#[derive(Debug, Clone, Copy)]
-pub enum JoinConstraint {
- /// Join ON
- On,
- /// Join USING
- Using,
-}
-
-/// Evaluates an arbitrary list of expressions (essentially a
-/// SELECT with an expression list) on its input.
-#[derive(Clone)]
-pub struct Projection {
- /// The list of expressions
- pub expr: Vec<Expr>,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The schema description of the output
- pub schema: DFSchemaRef,
- /// Projection output relation alias
- pub alias: Option<String>,
-}
-
-/// Aliased subquery
-#[derive(Clone)]
-pub struct SubqueryAlias {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The alias for the input relation
- pub alias: String,
- /// The schema with qualified field names
- pub schema: DFSchemaRef,
-}
+use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor};
+use crate::logical_plan::extension::UserDefinedLogicalNode;
+use crate::{Expr, TableProviderFilterPushDown, TableSource};
+use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion_common::{Column, DFSchemaRef, DataFusionError};
+use std::collections::HashSet;
+///! Logical plan types
+use std::fmt::{self, Display, Formatter};
+use std::sync::Arc;
-/// Filters rows from its input that do not match an
-/// expression (essentially a WHERE clause with a predicate
-/// expression).
+/// A LogicalPlan represents the different types of relational
+/// operators (such as Projection, Filter, etc) and can be created by
+/// the SQL query planner and the DataFrame API.
///
-/// Semantically, `<predicate>` is evaluated for each row of the input;
-/// If the value of `<predicate>` is true, the input row is passed to
-/// the output. If the value of `<predicate>` is false, the row is
-/// discarded.
-#[derive(Clone)]
-pub struct Filter {
- /// The predicate expression, which must have Boolean type.
- pub predicate: Expr,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
-}
-
-/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
+/// A LogicalPlan represents transforming an input relation (table) to
+/// an output relation (table) with a (potentially) different
+/// schema. A plan represents a dataflow tree where data flows
+/// from leaves up to the root to produce the query result.
#[derive(Clone)]
-pub struct Window {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The window function expression
- pub window_expr: Vec<Expr>,
- /// The schema description of the window output
- pub schema: DFSchemaRef,
-}
-
-/// DataFusion default table source, wrapping TableProvider
-///
-/// This structure adapts a `TableProvider` (physical plan trait) to the `TableSource`
-/// (logical plan trait)
-pub struct DefaultTableSource {
- /// table provider
- pub table_provider: Arc<dyn TableProvider>,
-}
-
-impl DefaultTableSource {
- /// Create a new DefaultTableSource to wrap a TableProvider
- pub fn new(table_provider: Arc<dyn TableProvider>) -> Self {
- Self { table_provider }
- }
+pub enum LogicalPlan {
+ /// Evaluates an arbitrary list of expressions (essentially a
+ /// SELECT with an expression list) on its input.
+ Projection(Projection),
+ /// Filters rows from its input that do not match an
+ /// expression (essentially a WHERE clause with a predicate
+ /// expression).
+ ///
+ /// Semantically, `<predicate>` is evaluated for each row of the input;
+ /// If the value of `<predicate>` is true, the input row is passed to
+ /// the output. If the value of `<predicate>` is false, the row is
+ /// discarded.
+ Filter(Filter),
+ /// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
+ Window(Window),
+ /// Aggregates its input based on a set of grouping and aggregate
+ /// expressions (e.g. SUM).
+ Aggregate(Aggregate),
+ /// Sorts its input according to a list of sort expressions.
+ Sort(Sort),
+ /// Join two logical plans on one or more join columns
+ Join(Join),
+ /// Apply Cross Join to two logical plans
+ CrossJoin(CrossJoin),
+ /// Repartition the plan based on a partitioning scheme.
+ Repartition(Repartition),
+ /// Union multiple inputs
+ Union(Union),
+ /// Produces rows from a table provider by reference or from the context
+ TableScan(TableScan),
+ /// Produces no rows: An empty relation with an empty schema
+ EmptyRelation(EmptyRelation),
+ /// Aliased relation provides, or changes, the name of a relation.
+ SubqueryAlias(SubqueryAlias),
+ /// Produces the first `n` tuples from its input and discards the rest.
+ Limit(Limit),
+ /// Creates an external table.
+ CreateExternalTable(CreateExternalTable),
+ /// Creates an in memory table.
+ CreateMemoryTable(CreateMemoryTable),
+ /// Creates a new catalog schema.
+ CreateCatalogSchema(CreateCatalogSchema),
+ /// Creates a new catalog (aka "Database").
+ CreateCatalog(CreateCatalog),
+ /// Drops a table.
+ DropTable(DropTable),
+ /// Values expression. See
+ /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+ /// documentation for more details.
+ Values(Values),
+ /// Produces a relation with string representations of
+ /// various parts of the plan
+ Explain(Explain),
+ /// Runs the actual plan, and then prints the physical plan with
+ /// with execution metrics.
+ Analyze(Analyze),
+ /// Extension operator defined outside of DataFusion
+ Extension(Extension),
}
-impl TableSource for DefaultTableSource {
- /// Returns the table source as [`Any`](std::any::Any) so that it can be
- /// downcast to a specific implementation.
- fn as_any(&self) -> &dyn Any {
- self
+impl LogicalPlan {
+ /// Get a reference to the logical plan's schema
+ pub fn schema(&self) -> &DFSchemaRef {
+ match self {
+ LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
+ LogicalPlan::Values(Values { schema, .. }) => schema,
+ LogicalPlan::TableScan(TableScan {
+ projected_schema, ..
+ }) => projected_schema,
+ LogicalPlan::Projection(Projection { schema, .. }) => schema,
+ LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
+ LogicalPlan::Window(Window { schema, .. }) => schema,
+ LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
+ LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
+ LogicalPlan::Join(Join { schema, .. }) => schema,
+ LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
+ LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
+ LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+ LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
+ LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
+ schema
+ }
+ LogicalPlan::Explain(explain) => &explain.schema,
+ LogicalPlan::Analyze(analyze) => &analyze.schema,
+ LogicalPlan::Extension(extension) => extension.node.schema(),
+ LogicalPlan::Union(Union { schema, .. }) => schema,
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ input.schema()
+ }
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
+ schema
+ }
+ LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
+ LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
+ }
}
- /// Get a reference to the schema for this table
- fn schema(&self) -> SchemaRef {
- self.table_provider.schema()
+ /// Get a vector of references to all schemas in every node of the logical plan
+ pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
+ match self {
+ LogicalPlan::TableScan(TableScan {
+ projected_schema, ..
+ }) => vec![projected_schema],
+ LogicalPlan::Values(Values { schema, .. }) => vec![schema],
+ LogicalPlan::Window(Window { input, schema, .. })
+ | LogicalPlan::Projection(Projection { input, schema, .. })
+ | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
+ let mut schemas = input.all_schemas();
+ schemas.insert(0, schema);
+ schemas
+ }
+ LogicalPlan::Join(Join {
+ left,
+ right,
+ schema,
+ ..
+ })
+ | LogicalPlan::CrossJoin(CrossJoin {
+ left,
+ right,
+ schema,
+ }) => {
+ let mut schemas = left.all_schemas();
+ schemas.extend(right.all_schemas());
+ schemas.insert(0, schema);
+ schemas
+ }
+ LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
+ vec![schema]
+ }
+ LogicalPlan::Union(Union { schema, .. }) => {
+ vec![schema]
+ }
+ LogicalPlan::Extension(extension) => vec![extension.node.schema()],
+ LogicalPlan::Explain(Explain { schema, .. })
+ | LogicalPlan::Analyze(Analyze { schema, .. })
+ | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
+ | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
+ | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
+ | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
+ vec![schema]
+ }
+ LogicalPlan::Limit(Limit { input, .. })
+ | LogicalPlan::Repartition(Repartition { input, .. })
+ | LogicalPlan::Sort(Sort { input, .. })
+ | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
+ | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
+ LogicalPlan::DropTable(_) => vec![],
+ }
}
- /// Tests whether the table provider can make use of a filter expression
- /// to optimise data retrieval.
- fn supports_filter_pushdown(
- &self,
- filter: &Expr,
- ) -> datafusion_common::Result<TableProviderFilterPushDown> {
- self.table_provider.supports_filter_pushdown(filter)
+ /// Returns the (fixed) output schema for explain plans
+ pub fn explain_schema() -> SchemaRef {
+ SchemaRef::new(Schema::new(vec![
+ Field::new("plan_type", DataType::Utf8, false),
+ Field::new("plan", DataType::Utf8, false),
+ ]))
}
-}
-
-/// Wrap TableProvider in TableSource
-pub fn provider_as_source(
- table_provider: Arc<dyn TableProvider>,
-) -> Arc<dyn TableSource> {
- Arc::new(DefaultTableSource::new(table_provider))
-}
-/// Attempt to downcast a TableSource to DefaultTableSource and access the
-/// TableProvider. This will only work with a TableSource created by DataFusion.
-pub fn source_as_provider(
- source: &Arc<dyn TableSource>,
-) -> datafusion_common::Result<Arc<dyn TableProvider>> {
- match source
- .as_ref()
- .as_any()
- .downcast_ref::<DefaultTableSource>()
- {
- Some(source) => Ok(source.table_provider.clone()),
- _ => Err(DataFusionError::Internal(
- "TableSource was not DefaultTableSource".to_string(),
- )),
+ /// returns all expressions (non-recursively) in the current
+ /// logical plan node. This does not include expressions in any
+ /// children
+ pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
+ match self {
+ LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
+ LogicalPlan::Values(Values { values, .. }) => {
+ values.iter().flatten().cloned().collect()
+ }
+ LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::Hash(expr, _) => expr.clone(),
+ _ => vec![],
+ },
+ LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
+ LogicalPlan::Aggregate(Aggregate {
+ group_expr,
+ aggr_expr,
+ ..
+ }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
+ LogicalPlan::Join(Join { on, .. }) => on
+ .iter()
+ .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
+ .collect(),
+ LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
+ LogicalPlan::Extension(extension) => extension.node.expressions(),
+ // plans without expressions
+ LogicalPlan::TableScan { .. }
+ | LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::SubqueryAlias(_)
+ | LogicalPlan::Limit(_)
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateMemoryTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
+ | LogicalPlan::DropTable(_)
+ | LogicalPlan::CrossJoin(_)
+ | LogicalPlan::Analyze { .. }
+ | LogicalPlan::Explain { .. }
+ | LogicalPlan::Union(_) => {
+ vec![]
+ }
+ }
}
-}
-/// Produces rows from a table provider by reference or from the context
-#[derive(Clone)]
-pub struct TableScan {
- /// The name of the table
- pub table_name: String,
- /// The source of the table
- pub source: Arc<dyn TableSource>,
- /// Optional column indices to use as a projection
- pub projection: Option<Vec<usize>>,
- /// The schema description of the output
- pub projected_schema: DFSchemaRef,
- /// Optional expressions to be used as filters by the table provider
- pub filters: Vec<Expr>,
- /// Optional limit to skip reading
- pub limit: Option<usize>,
-}
+ /// returns all inputs of this `LogicalPlan` node. Does not
+ /// include inputs to inputs.
+ pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
+ match self {
+ LogicalPlan::Projection(Projection { input, .. }) => vec![input],
+ LogicalPlan::Filter(Filter { input, .. }) => vec![input],
+ LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
+ LogicalPlan::Window(Window { input, .. }) => vec![input],
+ LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
+ LogicalPlan::Sort(Sort { input, .. }) => vec![input],
+ LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
+ LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
+ LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
+ LogicalPlan::Extension(extension) => extension.node.inputs(),
+ LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
+ LogicalPlan::Explain(explain) => vec![&explain.plan],
+ LogicalPlan::Analyze(analyze) => vec![&analyze.input],
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ vec![input]
+ }
+ // plans without inputs
+ LogicalPlan::TableScan { .. }
+ | LogicalPlan::EmptyRelation { .. }
+ | LogicalPlan::Values { .. }
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
+ | LogicalPlan::DropTable(_) => vec![],
+ }
+ }
-/// Apply Cross Join to two logical plans
-#[derive(Clone)]
-pub struct CrossJoin {
- /// Left input
- pub left: Arc<LogicalPlan>,
- /// Right input
- pub right: Arc<LogicalPlan>,
- /// The output schema, containing fields from the left and right inputs
- pub schema: DFSchemaRef,
-}
+ /// returns all `Using` join columns in a logical plan
+ pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
+ struct UsingJoinColumnVisitor {
+ using_columns: Vec<HashSet<Column>>,
+ }
-/// Repartition the plan based on a partitioning scheme.
-#[derive(Clone)]
-pub struct Repartition {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// The partitioning scheme
- pub partitioning_scheme: Partitioning,
-}
+ impl PlanVisitor for UsingJoinColumnVisitor {
+ type Error = DataFusionError;
-/// Union multiple inputs
-#[derive(Clone)]
-pub struct Union {
- /// Inputs to merge
- pub inputs: Vec<LogicalPlan>,
- /// Union schema. Should be the same for all inputs.
- pub schema: DFSchemaRef,
- /// Union output relation alias
- pub alias: Option<String>,
-}
+ fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
+ if let LogicalPlan::Join(Join {
+ join_constraint: JoinConstraint::Using,
+ on,
+ ..
+ }) = plan
+ {
+ self.using_columns.push(
+ on.iter()
+ .flat_map(|entry| [&entry.0, &entry.1])
+ .cloned()
+ .collect::<HashSet<Column>>(),
+ );
+ }
+ Ok(true)
+ }
+ }
-/// Creates an in memory table.
-#[derive(Clone)]
-pub struct CreateMemoryTable {
- /// The table name
- pub name: String,
- /// The logical plan
- pub input: Arc<LogicalPlan>,
- /// Option to not error if table already exists
- pub if_not_exists: bool,
+ let mut visitor = UsingJoinColumnVisitor {
+ using_columns: vec![],
+ };
+ self.accept(&mut visitor)?;
+ Ok(visitor.using_columns)
+ }
}
-/// Types of files to parse as DataFrames
-#[derive(Debug, Clone, Copy, PartialEq)]
-pub enum FileType {
- /// Newline-delimited JSON
- NdJson,
- /// Apache Parquet columnar storage
- Parquet,
- /// Comma separated values
- CSV,
- /// Avro binary records
- Avro,
-}
+/// Trait that implements the [Visitor
+/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
+/// depth first walk of `LogicalPlan` nodes. `pre_visit` is called
+/// before any children are visited, and then `post_visit` is called
+/// after all children have been visited.
+////
+/// To use, define a struct that implements this trait and then invoke
+/// [`LogicalPlan::accept`].
+///
+/// For example, for a logical plan like:
+///
+/// ```text
+/// Projection: #id
+/// Filter: #state Eq Utf8(\"CO\")\
+/// CsvScan: employee.csv projection=Some([0, 3])";
+/// ```
+///
+/// The sequence of visit operations would be:
+/// ```text
+/// visitor.pre_visit(Projection)
+/// visitor.pre_visit(Filter)
+/// visitor.pre_visit(CsvScan)
+/// visitor.post_visit(CsvScan)
+/// visitor.post_visit(Filter)
+/// visitor.post_visit(Projection)
+/// ```
+pub trait PlanVisitor {
+ /// The type of error returned by this visitor
+ type Error;
-/// Creates an external table.
-#[derive(Clone)]
-pub struct CreateExternalTable {
- /// The table schema
- pub schema: DFSchemaRef,
- /// The table name
- pub name: String,
- /// The physical location
- pub location: String,
- /// The file type of physical file
- pub file_type: FileType,
- /// Whether the CSV file contains a header
- pub has_header: bool,
- /// Delimiter for CSV
- pub delimiter: char,
- /// Partition Columns
- pub table_partition_cols: Vec<String>,
- /// Option to not error if table already exists
- pub if_not_exists: bool,
-}
+ /// Invoked on a logical plan before any of its child inputs have been
+ /// visited. If Ok(true) is returned, the recursion continues. If
+ /// Err(..) or Ok(false) are returned, the recursion stops
+ /// immediately and the error, if any, is returned to `accept`
+ fn pre_visit(&mut self, plan: &LogicalPlan)
+ -> std::result::Result<bool, Self::Error>;
-/// Creates a schema.
-#[derive(Clone)]
-pub struct CreateCatalogSchema {
- /// The table schema
- pub schema_name: String,
- /// Do nothing (except issuing a notice) if a schema with the same name already exists
- pub if_not_exists: bool,
- /// Empty schema
- pub schema: DFSchemaRef,
+ /// Invoked on a logical plan after all of its child inputs have
+ /// been visited. The return value is handled the same as the
+ /// return value of `pre_visit`. The provided default implementation
+ /// returns `Ok(true)`.
+ fn post_visit(
+ &mut self,
+ _plan: &LogicalPlan,
+ ) -> std::result::Result<bool, Self::Error> {
+ Ok(true)
+ }
}
-/// Creates a catalog (aka "Database").
-#[derive(Clone)]
-pub struct CreateCatalog {
- /// The catalog name
- pub catalog_name: String,
- /// Do nothing (except issuing a notice) if a schema with the same name already exists
- pub if_not_exists: bool,
- /// Empty schema
- pub schema: DFSchemaRef,
-}
+impl LogicalPlan {
+ /// returns all inputs in the logical plan. Returns Ok(true) if
+ /// all nodes were visited, and Ok(false) if any call to
+ /// `pre_visit` or `post_visit` returned Ok(false) and may have
+ /// cut short the recursion
+ pub fn accept<V>(&self, visitor: &mut V) -> std::result::Result<bool, V::Error>
+ where
+ V: PlanVisitor,
+ {
+ if !visitor.pre_visit(self)? {
+ return Ok(false);
+ }
-/// Drops a table.
-#[derive(Clone)]
-pub struct DropTable {
- /// The table name
- pub name: String,
- /// If the table exists
- pub if_exists: bool,
- /// Dummy schema
- pub schema: DFSchemaRef,
-}
+ let recurse = match self {
+ LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Repartition(Repartition { input, .. }) => {
+ input.accept(visitor)?
+ }
+ LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Aggregate(Aggregate { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Join(Join { left, right, .. })
+ | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
+ left.accept(visitor)? && right.accept(visitor)?
+ }
+ LogicalPlan::Union(Union { inputs, .. }) => {
+ for input in inputs {
+ if !input.accept(visitor)? {
+ return Ok(false);
+ }
+ }
+ true
+ }
+ LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
+ input.accept(visitor)?
+ }
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
+ input.accept(visitor)?
+ }
+ LogicalPlan::Extension(extension) => {
+ for input in extension.node.inputs() {
+ if !input.accept(visitor)? {
+ return Ok(false);
+ }
+ }
+ true
+ }
+ LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
+ LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
+ // plans without inputs
+ LogicalPlan::TableScan { .. }
+ | LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Values(_)
+ | LogicalPlan::CreateExternalTable(_)
+ | LogicalPlan::CreateCatalogSchema(_)
+ | LogicalPlan::CreateCatalog(_)
+ | LogicalPlan::DropTable(_) => true,
+ };
+ if !recurse {
+ return Ok(false);
+ }
-/// Produces a relation with string representations of
-/// various parts of the plan
-#[derive(Clone)]
-pub struct Explain {
- /// Should extra (detailed, intermediate plans) be included?
- pub verbose: bool,
- /// The logical plan that is being EXPLAIN'd
- pub plan: Arc<LogicalPlan>,
- /// Represent the various stages plans have gone through
- pub stringified_plans: Vec<StringifiedPlan>,
- /// The output schema of the explain (2 columns of text)
- pub schema: DFSchemaRef,
-}
+ if !visitor.post_visit(self)? {
+ return Ok(false);
+ }
-/// Runs the actual plan, and then prints the physical plan with
-/// with execution metrics.
-#[derive(Clone)]
-pub struct Analyze {
- /// Should extra detail be included?
- pub verbose: bool,
- /// The logical plan that is being EXPLAIN ANALYZE'd
- pub input: Arc<LogicalPlan>,
- /// The output schema of the explain (2 columns of text)
- pub schema: DFSchemaRef,
+ Ok(true)
+ }
}
-/// Extension operator defined outside of DataFusion
-#[derive(Clone)]
-pub struct Extension {
- /// The runtime extension operator
- pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
-}
+// Various implementations for printing out LogicalPlans
+impl LogicalPlan {
+ /// Return a `format`able structure that produces a single line
+ /// per node. For example:
+ ///
+ /// ```text
+ /// Projection: #employee.id
+ /// Filter: #employee.state Eq Utf8(\"CO\")\
+ /// CsvScan: employee projection=Some([0, 3])
+ /// ```
+ ///
+ /// ```ignore
+ /// use arrow::datatypes::{Field, Schema, DataType};
+ /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
+ /// let schema = Schema::new(vec![
+ /// Field::new("id", DataType::Int32, false),
+ /// ]);
+ /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
+ /// .filter(col("id").eq(lit(5))).unwrap()
+ /// .build().unwrap();
+ ///
+ /// // Format using display_indent
+ /// let display_string = format!("{}", plan.display_indent());
+ ///
+ /// assert_eq!("Filter: #foo_csv.id = Int32(5)\
+ /// \n TableScan: foo_csv projection=None",
+ /// display_string);
+ /// ```
+ pub fn display_indent(&self) -> impl fmt::Display + '_ {
+ // Boilerplate structure to wrap LogicalPlan with something
+ // that that can be formatted
+ struct Wrapper<'a>(&'a LogicalPlan);
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let with_schema = false;
+ let mut visitor = IndentVisitor::new(f, with_schema);
+ self.0.accept(&mut visitor).unwrap();
+ Ok(())
+ }
+ }
+ Wrapper(self)
+ }
-/// Produces no rows: An empty relation with an empty schema
-#[derive(Clone)]
-pub struct EmptyRelation {
- /// Whether to produce a placeholder row
- pub produce_one_row: bool,
- /// The schema description of the output
- pub schema: DFSchemaRef,
-}
+ /// Return a `format`able structure that produces a single line
+ /// per node that includes the output schema. For example:
+ ///
+ /// ```text
+ /// Projection: #employee.id [id:Int32]\
+ /// Filter: #employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
+ /// TableScan: employee projection=Some([0, 3]) [id:Int32, state:Utf8]";
+ /// ```
+ ///
+ /// ```ignore
+ /// use arrow::datatypes::{Field, Schema, DataType};
+ /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
+ /// let schema = Schema::new(vec![
+ /// Field::new("id", DataType::Int32, false),
+ /// ]);
+ /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
+ /// .filter(col("id").eq(lit(5))).unwrap()
+ /// .build().unwrap();
+ ///
+ /// // Format using display_indent_schema
+ /// let display_string = format!("{}", plan.display_indent_schema());
+ ///
+ /// assert_eq!("Filter: #foo_csv.id = Int32(5) [id:Int32]\
+ /// \n TableScan: foo_csv projection=None [id:Int32]",
+ /// display_string);
+ /// ```
+ pub fn display_indent_schema(&self) -> impl fmt::Display + '_ {
+ // Boilerplate structure to wrap LogicalPlan with something
+ // that that can be formatted
+ struct Wrapper<'a>(&'a LogicalPlan);
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ let with_schema = true;
+ let mut visitor = IndentVisitor::new(f, with_schema);
+ self.0.accept(&mut visitor).unwrap();
+ Ok(())
+ }
+ }
+ Wrapper(self)
+ }
-/// Produces the first `n` tuples from its input and discards the rest.
-#[derive(Clone)]
-pub struct Limit {
- /// The limit
- pub n: usize,
- /// The logical plan
- pub input: Arc<LogicalPlan>,
-}
+ /// Return a `format`able structure that produces lines meant for
+ /// graphical display using the `DOT` language. This format can be
+ /// visualized using software from
+ /// [`graphviz`](https://graphviz.org/)
+ ///
+ /// This currently produces two graphs -- one with the basic
+ /// structure, and one with additional details such as schema.
+ ///
+ /// ```ignore
+ /// use arrow::datatypes::{Field, Schema, DataType};
+ /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
+ /// let schema = Schema::new(vec![
+ /// Field::new("id", DataType::Int32, false),
+ /// ]);
+ /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
+ /// .filter(col("id").eq(lit(5))).unwrap()
+ /// .build().unwrap();
+ ///
+ /// // Format using display_graphviz
+ /// let graphviz_string = format!("{}", plan.display_graphviz());
+ /// ```
+ ///
+ /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
+ /// commands can be used to render it as a pdf:
+ ///
+ /// ```bash
+ /// dot -Tpdf < /tmp/example.dot > /tmp/example.pdf
+ /// ```
+ ///
+ pub fn display_graphviz(&self) -> impl fmt::Display + '_ {
+ // Boilerplate structure to wrap LogicalPlan with something
+ // that that can be formatted
+ struct Wrapper<'a>(&'a LogicalPlan);
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ writeln!(
+ f,
+ "// Begin DataFusion GraphViz Plan (see https://graphviz.org)"
+ )?;
+ writeln!(f, "digraph {{")?;
-/// Values expression. See
-/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
-/// documentation for more details.
-#[derive(Clone)]
-pub struct Values {
- /// The table schema
- pub schema: DFSchemaRef,
- /// Values
- pub values: Vec<Vec<Expr>>,
-}
+ let mut visitor = GraphvizVisitor::new(f);
-/// Aggregates its input based on a set of grouping and aggregate
-/// expressions (e.g. SUM).
-#[derive(Clone)]
-pub struct Aggregate {
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
- /// Grouping expressions
- pub group_expr: Vec<Expr>,
- /// Aggregate expressions
- pub aggr_expr: Vec<Expr>,
- /// The schema description of the aggregate output
- pub schema: DFSchemaRef,
-}
-
-/// Sorts its input according to a list of sort expressions.
-#[derive(Clone)]
-pub struct Sort {
- /// The sort expressions
- pub expr: Vec<Expr>,
- /// The incoming logical plan
- pub input: Arc<LogicalPlan>,
-}
-
-/// Join two logical plans on one or more join columns
-#[derive(Clone)]
-pub struct Join {
- /// Left input
- pub left: Arc<LogicalPlan>,
- /// Right input
- pub right: Arc<LogicalPlan>,
- /// Equijoin clause expressed as pairs of (left, right) join columns
- pub on: Vec<(Column, Column)>,
- /// Join type
- pub join_type: JoinType,
- /// Join constraint
- pub join_constraint: JoinConstraint,
- /// The output schema, containing fields from the left and right inputs
- pub schema: DFSchemaRef,
- /// If null_equals_null is true, null == null else null != null
- pub null_equals_null: bool,
-}
-
-/// A LogicalPlan represents the different types of relational
-/// operators (such as Projection, Filter, etc) and can be created by
-/// the SQL query planner and the DataFrame API.
-///
-/// A LogicalPlan represents transforming an input relation (table) to
-/// an output relation (table) with a (potentially) different
-/// schema. A plan represents a dataflow tree where data flows
-/// from leaves up to the root to produce the query result.
-#[derive(Clone)]
-pub enum LogicalPlan {
- /// Evaluates an arbitrary list of expressions (essentially a
- /// SELECT with an expression list) on its input.
- Projection(Projection),
- /// Filters rows from its input that do not match an
- /// expression (essentially a WHERE clause with a predicate
- /// expression).
- ///
- /// Semantically, `<predicate>` is evaluated for each row of the input;
- /// If the value of `<predicate>` is true, the input row is passed to
- /// the output. If the value of `<predicate>` is false, the row is
- /// discarded.
- Filter(Filter),
- /// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
- Window(Window),
- /// Aggregates its input based on a set of grouping and aggregate
- /// expressions (e.g. SUM).
- Aggregate(Aggregate),
- /// Sorts its input according to a list of sort expressions.
- Sort(Sort),
- /// Join two logical plans on one or more join columns
- Join(Join),
- /// Apply Cross Join to two logical plans
- CrossJoin(CrossJoin),
- /// Repartition the plan based on a partitioning scheme.
- Repartition(Repartition),
- /// Union multiple inputs
- Union(Union),
- /// Produces rows from a table provider by reference or from the context
- TableScan(TableScan),
- /// Produces no rows: An empty relation with an empty schema
- EmptyRelation(EmptyRelation),
- /// Aliased relation provides, or changes, the name of a relation.
- SubqueryAlias(SubqueryAlias),
- /// Produces the first `n` tuples from its input and discards the rest.
- Limit(Limit),
- /// Creates an external table.
- CreateExternalTable(CreateExternalTable),
- /// Creates an in memory table.
- CreateMemoryTable(CreateMemoryTable),
- /// Creates a new catalog schema.
- CreateCatalogSchema(CreateCatalogSchema),
- /// Creates a new catalog (aka "Database").
- CreateCatalog(CreateCatalog),
- /// Drops a table.
- DropTable(DropTable),
- /// Values expression. See
- /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
- /// documentation for more details.
- Values(Values),
- /// Produces a relation with string representations of
- /// various parts of the plan
- Explain(Explain),
- /// Runs the actual plan, and then prints the physical plan with
- /// with execution metrics.
- Analyze(Analyze),
- /// Extension operator defined outside of DataFusion
- Extension(Extension),
-}
-
-impl LogicalPlan {
- /// Get a reference to the logical plan's schema
- pub fn schema(&self) -> &DFSchemaRef {
- match self {
- LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema,
- LogicalPlan::Values(Values { schema, .. }) => schema,
- LogicalPlan::TableScan(TableScan {
- projected_schema, ..
- }) => projected_schema,
- LogicalPlan::Projection(Projection { schema, .. }) => schema,
- LogicalPlan::Filter(Filter { input, .. }) => input.schema(),
- LogicalPlan::Window(Window { schema, .. }) => schema,
- LogicalPlan::Aggregate(Aggregate { schema, .. }) => schema,
- LogicalPlan::Sort(Sort { input, .. }) => input.schema(),
- LogicalPlan::Join(Join { schema, .. }) => schema,
- LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
- LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
- LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
- LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
- LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
- schema
- }
- LogicalPlan::Explain(explain) => &explain.schema,
- LogicalPlan::Analyze(analyze) => &analyze.schema,
- LogicalPlan::Extension(extension) => extension.node.schema(),
- LogicalPlan::Union(Union { schema, .. }) => schema,
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- input.schema()
- }
- LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) => {
- schema
- }
- LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => schema,
- LogicalPlan::DropTable(DropTable { schema, .. }) => schema,
- }
- }
-
- /// Get a vector of references to all schemas in every node of the logical plan
- pub fn all_schemas(&self) -> Vec<&DFSchemaRef> {
- match self {
- LogicalPlan::TableScan(TableScan {
- projected_schema, ..
- }) => vec![projected_schema],
- LogicalPlan::Values(Values { schema, .. }) => vec![schema],
- LogicalPlan::Window(Window { input, schema, .. })
- | LogicalPlan::Projection(Projection { input, schema, .. })
- | LogicalPlan::Aggregate(Aggregate { input, schema, .. }) => {
- let mut schemas = input.all_schemas();
- schemas.insert(0, schema);
- schemas
- }
- LogicalPlan::Join(Join {
- left,
- right,
- schema,
- ..
- })
- | LogicalPlan::CrossJoin(CrossJoin {
- left,
- right,
- schema,
- }) => {
- let mut schemas = left.all_schemas();
- schemas.extend(right.all_schemas());
- schemas.insert(0, schema);
- schemas
- }
- LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Union(Union { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Extension(extension) => vec![extension.node.schema()],
- LogicalPlan::Explain(Explain { schema, .. })
- | LogicalPlan::Analyze(Analyze { schema, .. })
- | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. })
- | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. })
- | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. })
- | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => {
- vec![schema]
- }
- LogicalPlan::Limit(Limit { input, .. })
- | LogicalPlan::Repartition(Repartition { input, .. })
- | LogicalPlan::Sort(Sort { input, .. })
- | LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. })
- | LogicalPlan::Filter(Filter { input, .. }) => input.all_schemas(),
- LogicalPlan::DropTable(_) => vec![],
- }
- }
-
- /// Returns the (fixed) output schema for explain plans
- pub fn explain_schema() -> SchemaRef {
- SchemaRef::new(Schema::new(vec![
- Field::new("plan_type", DataType::Utf8, false),
- Field::new("plan", DataType::Utf8, false),
- ]))
- }
-
- /// returns all expressions (non-recursively) in the current
- /// logical plan node. This does not include expressions in any
- /// children
- pub fn expressions(self: &LogicalPlan) -> Vec<Expr> {
- match self {
- LogicalPlan::Projection(Projection { expr, .. }) => expr.clone(),
- LogicalPlan::Values(Values { values, .. }) => {
- values.iter().flatten().cloned().collect()
- }
- LogicalPlan::Filter(Filter { predicate, .. }) => vec![predicate.clone()],
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::Hash(expr, _) => expr.clone(),
- _ => vec![],
- },
- LogicalPlan::Window(Window { window_expr, .. }) => window_expr.clone(),
- LogicalPlan::Aggregate(Aggregate {
- group_expr,
- aggr_expr,
- ..
- }) => group_expr.iter().chain(aggr_expr.iter()).cloned().collect(),
- LogicalPlan::Join(Join { on, .. }) => on
- .iter()
- .flat_map(|(l, r)| vec![Expr::Column(l.clone()), Expr::Column(r.clone())])
- .collect(),
- LogicalPlan::Sort(Sort { expr, .. }) => expr.clone(),
- LogicalPlan::Extension(extension) => extension.node.expressions(),
- // plans without expressions
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation(_)
- | LogicalPlan::SubqueryAlias(_)
- | LogicalPlan::Limit(_)
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateMemoryTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_)
- | LogicalPlan::CrossJoin(_)
- | LogicalPlan::Analyze { .. }
- | LogicalPlan::Explain { .. }
- | LogicalPlan::Union(_) => {
- vec![]
- }
- }
- }
-
- /// returns all inputs of this `LogicalPlan` node. Does not
- /// include inputs to inputs.
- pub fn inputs(self: &LogicalPlan) -> Vec<&LogicalPlan> {
- match self {
- LogicalPlan::Projection(Projection { input, .. }) => vec![input],
- LogicalPlan::Filter(Filter { input, .. }) => vec![input],
- LogicalPlan::Repartition(Repartition { input, .. }) => vec![input],
- LogicalPlan::Window(Window { input, .. }) => vec![input],
- LogicalPlan::Aggregate(Aggregate { input, .. }) => vec![input],
- LogicalPlan::Sort(Sort { input, .. }) => vec![input],
- LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
- LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
- LogicalPlan::Limit(Limit { input, .. }) => vec![input],
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
- LogicalPlan::Extension(extension) => extension.node.inputs(),
- LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
- LogicalPlan::Explain(explain) => vec![&explain.plan],
- LogicalPlan::Analyze(analyze) => vec![&analyze.input],
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- vec![input]
- }
- // plans without inputs
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation { .. }
- | LogicalPlan::Values { .. }
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_) => vec![],
- }
- }
-
- /// returns all `Using` join columns in a logical plan
- pub fn using_columns(&self) -> Result<Vec<HashSet<Column>>, DataFusionError> {
- struct UsingJoinColumnVisitor {
- using_columns: Vec<HashSet<Column>>,
- }
+ visitor.pre_visit_plan("LogicalPlan")?;
+ self.0.accept(&mut visitor).unwrap();
+ visitor.post_visit_plan()?;
- impl PlanVisitor for UsingJoinColumnVisitor {
- type Error = DataFusionError;
+ visitor.set_with_schema(true);
+ visitor.pre_visit_plan("Detailed LogicalPlan")?;
+ self.0.accept(&mut visitor).unwrap();
+ visitor.post_visit_plan()?;
- fn pre_visit(&mut self, plan: &LogicalPlan) -> Result<bool, Self::Error> {
- if let LogicalPlan::Join(Join {
- join_constraint: JoinConstraint::Using,
- on,
- ..
- }) = plan
- {
- self.using_columns.push(
- on.iter()
- .flat_map(|entry| [&entry.0, &entry.1])
- .cloned()
- .collect::<HashSet<Column>>(),
- );
- }
- Ok(true)
+ writeln!(f, "}}")?;
+ writeln!(f, "// End DataFusion GraphViz Plan")?;
+ Ok(())
}
}
-
- let mut visitor = UsingJoinColumnVisitor {
- using_columns: vec![],
- };
- self.accept(&mut visitor)?;
- Ok(visitor.using_columns)
+ Wrapper(self)
}
-}
-/// Logical partitioning schemes supported by the repartition operator.
-#[derive(Debug, Clone)]
-pub enum Partitioning {
- /// Allocate batches using a round-robin algorithm and the specified number of partitions
- RoundRobinBatch(usize),
- /// Allocate rows based on a hash of one of more expressions and the specified number
- /// of partitions.
- /// This partitioning scheme is not yet fully supported. See <https://issues.apache.org/jira/browse/ARROW-11011>
- Hash(Vec<Expr>, usize),
-}
+ /// Return a `format`able structure with the a human readable
+ /// description of this LogicalPlan node per node, not including
+ /// children. For example:
+ ///
+ /// ```text
+ /// Projection: #id
+ /// ```
+ /// ```ignore
+ /// use arrow::datatypes::{Field, Schema, DataType};
+ /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
+ /// let schema = Schema::new(vec![
+ /// Field::new("id", DataType::Int32, false),
+ /// ]);
+ /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
+ /// .build().unwrap();
+ ///
+ /// // Format using display
+ /// let display_string = format!("{}", plan.display());
+ ///
+ /// assert_eq!("TableScan: foo.csv projection=None", display_string);
+ /// ```
+ pub fn display(&self) -> impl fmt::Display + '_ {
+ // Boilerplate structure to wrap LogicalPlan with something
+ // that that can be formatted
+ struct Wrapper<'a>(&'a LogicalPlan);
+ impl<'a> fmt::Display for Wrapper<'a> {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match &*self.0 {
+ LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
+ LogicalPlan::Values(Values { ref values, .. }) => {
+ let str_values: Vec<_> = values
+ .iter()
+ // limit to only 5 values to avoid horrible display
+ .take(5)
+ .map(|row| {
+ let item = row
+ .iter()
+ .map(|expr| expr.to_string())
+ .collect::<Vec<_>>()
+ .join(", ");
+ format!("({})", item)
+ })
+ .collect();
-/// Trait that implements the [Visitor
-/// pattern](https://en.wikipedia.org/wiki/Visitor_pattern) for a
-/// depth first walk of `LogicalPlan` nodes. `pre_visit` is called
-/// before any children are visited, and then `post_visit` is called
-/// after all children have been visited.
-////
-/// To use, define a struct that implements this trait and then invoke
-/// [`LogicalPlan::accept`].
-///
-/// For example, for a logical plan like:
-///
-/// ```text
-/// Projection: #id
-/// Filter: #state Eq Utf8(\"CO\")\
-/// CsvScan: employee.csv projection=Some([0, 3])";
-/// ```
-///
-/// The sequence of visit operations would be:
-/// ```text
-/// visitor.pre_visit(Projection)
-/// visitor.pre_visit(Filter)
-/// visitor.pre_visit(CsvScan)
-/// visitor.post_visit(CsvScan)
-/// visitor.post_visit(Filter)
-/// visitor.post_visit(Projection)
-/// ```
-pub trait PlanVisitor {
- /// The type of error returned by this visitor
- type Error;
+ let elipse = if values.len() > 5 { "..." } else { "" };
+ write!(f, "Values: {}{}", str_values.join(", "), elipse)
+ }
- /// Invoked on a logical plan before any of its child inputs have been
- /// visited. If Ok(true) is returned, the recursion continues. If
- /// Err(..) or Ok(false) are returned, the recursion stops
- /// immediately and the error, if any, is returned to `accept`
- fn pre_visit(&mut self, plan: &LogicalPlan)
- -> std::result::Result<bool, Self::Error>;
+ LogicalPlan::TableScan(TableScan {
+ ref source,
+ ref table_name,
+ ref projection,
+ ref filters,
+ ref limit,
+ ..
+ }) => {
+ write!(
+ f,
+ "TableScan: {} projection={:?}",
+ table_name, projection
+ )?;
- /// Invoked on a logical plan after all of its child inputs have
- /// been visited. The return value is handled the same as the
- /// return value of `pre_visit`. The provided default implementation
- /// returns `Ok(true)`.
- fn post_visit(
- &mut self,
- _plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- Ok(true)
- }
-}
+ if !filters.is_empty() {
+ let mut full_filter = vec![];
+ let mut partial_filter = vec![];
+ let mut unsupported_filters = vec![];
-impl LogicalPlan {
- /// returns all inputs in the logical plan. Returns Ok(true) if
- /// all nodes were visited, and Ok(false) if any call to
- /// `pre_visit` or `post_visit` returned Ok(false) and may have
- /// cut short the recursion
- pub fn accept<V>(&self, visitor: &mut V) -> std::result::Result<bool, V::Error>
- where
- V: PlanVisitor,
- {
- if !visitor.pre_visit(self)? {
- return Ok(false);
- }
+ filters.iter().for_each(|x| {
+ if let Ok(t) = source.supports_filter_pushdown(x) {
+ match t {
+ TableProviderFilterPushDown::Exact => {
+ full_filter.push(x)
+ }
+ TableProviderFilterPushDown::Inexact => {
+ partial_filter.push(x)
+ }
+ TableProviderFilterPushDown::Unsupported => {
+ unsupported_filters.push(x)
+ }
+ }
+ }
+ });
- let recurse = match self {
- LogicalPlan::Projection(Projection { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Filter(Filter { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Repartition(Repartition { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::Window(Window { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Aggregate(Aggregate { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Sort(Sort { input, .. }) => input.accept(visitor)?,
- LogicalPlan::Join(Join { left, right, .. })
- | LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => {
- left.accept(visitor)? && right.accept(visitor)?
- }
- LogicalPlan::Union(Union { inputs, .. }) => {
- for input in inputs {
- if !input.accept(visitor)? {
- return Ok(false);
+ if !full_filter.is_empty() {
+ write!(f, ", full_filters={:?}", full_filter)?;
+ };
+ if !partial_filter.is_empty() {
+ write!(f, ", partial_filters={:?}", partial_filter)?;
+ }
+ if !unsupported_filters.is_empty() {
+ write!(
+ f,
+ ", unsupported_filters={:?}",
+ unsupported_filters
+ )?;
+ }
+ }
+
+ if let Some(n) = limit {
+ write!(f, ", limit={}", n)?;
+ }
+
+ Ok(())
}
- }
- true
- }
- LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
- LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::CreateMemoryTable(CreateMemoryTable { input, .. }) => {
- input.accept(visitor)?
- }
- LogicalPlan::Extension(extension) => {
- for input in extension.node.inputs() {
- if !input.accept(visitor)? {
- return Ok(false);
+ LogicalPlan::Projection(Projection {
+ ref expr, alias, ..
+ }) => {
+ write!(f, "Projection: ")?;
+ for (i, expr_item) in expr.iter().enumerate() {
+ if i > 0 {
+ write!(f, ", ")?;
+ }
+ write!(f, "{:?}", expr_item)?;
+ }
+ if let Some(a) = alias {
+ write!(f, ", alias={}", a)?;
+ }
+ Ok(())
}
+ LogicalPlan::Filter(Filter {
+ predicate: ref expr,
+ ..
+ }) => write!(f, "Filter: {:?}", expr),
+ LogicalPlan::Window(Window {
+ ref window_expr, ..
+ }) => {
+ write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
+ }
+ LogicalPlan::Aggregate(Aggregate {
+ ref group_expr,
+ ref aggr_expr,
+ ..
+ }) => write!(
+ f,
+ "Aggregate: groupBy=[{:?}], aggr=[{:?}]",
+ group_expr, aggr_expr
+ ),
+ LogicalPlan::Sort(Sort { expr, .. }) => {
+ write!(f, "Sort: ")?;
+ for (i, expr_item) in expr.iter().enumerate() {
+ if i > 0 {
+ write!(f, ", ")?;
+ }
+ write!(f, "{:?}", expr_item)?;
+ }
+ Ok(())
+ }
+ LogicalPlan::Join(Join {
+ on: ref keys,
+ join_constraint,
+ join_type,
+ ..
+ }) => {
+ let join_expr: Vec<String> =
+ keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
+ match join_constraint {
+ JoinConstraint::On => {
+ write!(f, "{} Join: {}", join_type, join_expr.join(", "))
+ }
+ JoinConstraint::Using => {
+ write!(
+ f,
+ "{} Join: Using {}",
+ join_type,
+ join_expr.join(", ")
+ )
+ }
+ }
+ }
+ LogicalPlan::CrossJoin(_) => {
+ write!(f, "CrossJoin:")
+ }
+ LogicalPlan::Repartition(Repartition {
+ partitioning_scheme,
+ ..
+ }) => match partitioning_scheme {
+ Partitioning::RoundRobinBatch(n) => write!(
+ f,
+ "Repartition: RoundRobinBatch partition_count={}",
+ n
+ ),
+ Partitioning::Hash(expr, n) => {
+ let hash_expr: Vec<String> =
+ expr.iter().map(|e| format!("{:?}", e)).collect();
+ write!(
+ f,
+ "Repartition: Hash({}) partition_count={}",
+ hash_expr.join(", "),
+ n
+ )
+ }
+ },
+ LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+ LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
+ write!(f, "SubqueryAlias: {}", alias)
+ }
+ LogicalPlan::CreateExternalTable(CreateExternalTable {
+ ref name,
+ ..
+ }) => {
+ write!(f, "CreateExternalTable: {:?}", name)
+ }
+ LogicalPlan::CreateMemoryTable(CreateMemoryTable {
+ name, ..
+ }) => {
+ write!(f, "CreateMemoryTable: {:?}", name)
+ }
+ LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
+ schema_name,
+ ..
+ }) => {
+ write!(f, "CreateCatalogSchema: {:?}", schema_name)
+ }
+ LogicalPlan::CreateCatalog(CreateCatalog {
+ catalog_name, ..
+ }) => {
+ write!(f, "CreateCatalog: {:?}", catalog_name)
+ }
+ LogicalPlan::DropTable(DropTable {
+ name, if_exists, ..
+ }) => {
+ write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
+ }
+ LogicalPlan::Explain { .. } => write!(f, "Explain"),
+ LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
+ LogicalPlan::Union(_) => write!(f, "Union"),
+ LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
}
- true
}
- LogicalPlan::Explain(explain) => explain.plan.accept(visitor)?,
- LogicalPlan::Analyze(analyze) => analyze.input.accept(visitor)?,
- // plans without inputs
- LogicalPlan::TableScan { .. }
- | LogicalPlan::EmptyRelation(_)
- | LogicalPlan::Values(_)
- | LogicalPlan::CreateExternalTable(_)
- | LogicalPlan::CreateCatalogSchema(_)
- | LogicalPlan::CreateCatalog(_)
- | LogicalPlan::DropTable(_) => true,
- };
- if !recurse {
- return Ok(false);
- }
-
- if !visitor.post_visit(self)? {
- return Ok(false);
}
+ Wrapper(self)
+ }
+}
- Ok(true)
+impl fmt::Debug for LogicalPlan {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ self.display_indent().fmt(f)
}
}
-// Various implementations for printing out LogicalPlans
-impl LogicalPlan {
- /// Return a `format`able structure that produces a single line
- /// per node. For example:
- ///
- /// ```text
- /// Projection: #employee.id
- /// Filter: #employee.state Eq Utf8(\"CO\")\
- /// CsvScan: employee projection=Some([0, 3])
- /// ```
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_indent
- /// let display_string = format!("{}", plan.display_indent());
- ///
- /// assert_eq!("Filter: #foo_csv.id = Int32(5)\
- /// \n TableScan: foo_csv projection=None",
- /// display_string);
- /// ```
- pub fn display_indent(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let with_schema = false;
- let mut visitor = IndentVisitor::new(f, with_schema);
- self.0.accept(&mut visitor).unwrap();
- Ok(())
- }
- }
- Wrapper(self)
+impl ToStringifiedPlan for LogicalPlan {
+ fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
+ StringifiedPlan::new(plan_type, self.display_indent().to_string())
}
+}
- /// Return a `format`able structure that produces a single line
- /// per node that includes the output schema. For example:
- ///
- /// ```text
- /// Projection: #employee.id [id:Int32]\
- /// Filter: #employee.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
- /// TableScan: employee projection=Some([0, 3]) [id:Int32, state:Utf8]";
- /// ```
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo_csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_indent_schema
- /// let display_string = format!("{}", plan.display_indent_schema());
- ///
- /// assert_eq!("Filter: #foo_csv.id = Int32(5) [id:Int32]\
- /// \n TableScan: foo_csv projection=None [id:Int32]",
- /// display_string);
- /// ```
- pub fn display_indent_schema(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- let with_schema = true;
- let mut visitor = IndentVisitor::new(f, with_schema);
- self.0.accept(&mut visitor).unwrap();
- Ok(())
- }
- }
- Wrapper(self)
+/// Join type
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum JoinType {
+ /// Inner Join
+ Inner,
+ /// Left Join
+ Left,
+ /// Right Join
+ Right,
+ /// Full Join
+ Full,
+ /// Semi Join
+ Semi,
+ /// Anti Join
+ Anti,
+}
+
+impl Display for JoinType {
+ fn fmt(&self, f: &mut Formatter) -> fmt::Result {
+ let join_type = match self {
+ JoinType::Inner => "Inner",
+ JoinType::Left => "Left",
+ JoinType::Right => "Right",
+ JoinType::Full => "Full",
+ JoinType::Semi => "Semi",
+ JoinType::Anti => "Anti",
+ };
+ write!(f, "{}", join_type)
}
+}
- /// Return a `format`able structure that produces lines meant for
- /// graphical display using the `DOT` language. This format can be
- /// visualized using software from
- /// [`graphviz`](https://graphviz.org/)
- ///
- /// This currently produces two graphs -- one with the basic
- /// structure, and one with additional details such as schema.
- ///
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
- /// .filter(col("id").eq(lit(5))).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display_graphviz
- /// let graphviz_string = format!("{}", plan.display_graphviz());
- /// ```
- ///
- /// If graphviz string is saved to a file such as `/tmp/example.dot`, the following
- /// commands can be used to render it as a pdf:
- ///
- /// ```bash
- /// dot -Tpdf < /tmp/example.dot > /tmp/example.pdf
- /// ```
- ///
- pub fn display_graphviz(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- writeln!(
- f,
- "// Begin DataFusion GraphViz Plan (see https://graphviz.org)"
- )?;
- writeln!(f, "digraph {{")?;
+/// Join constraint
+#[derive(Debug, Clone, Copy)]
+pub enum JoinConstraint {
+ /// Join ON
+ On,
+ /// Join USING
+ Using,
+}
+
+/// Creates a catalog (aka "Database").
+#[derive(Clone)]
+pub struct CreateCatalog {
+ /// The catalog name
+ pub catalog_name: String,
+ /// Do nothing (except issuing a notice) if a schema with the same name already exists
+ pub if_not_exists: bool,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
+/// Creates a schema.
+#[derive(Clone)]
+pub struct CreateCatalogSchema {
+ /// The table schema
+ pub schema_name: String,
+ /// Do nothing (except issuing a notice) if a schema with the same name already exists
+ pub if_not_exists: bool,
+ /// Empty schema
+ pub schema: DFSchemaRef,
+}
+
+/// Drops a table.
+#[derive(Clone)]
+pub struct DropTable {
+ /// The table name
+ pub name: String,
+ /// If the table exists
+ pub if_exists: bool,
+ /// Dummy schema
+ pub schema: DFSchemaRef,
+}
+
+/// Produces no rows: An empty relation with an empty schema
+#[derive(Clone)]
+pub struct EmptyRelation {
+ /// Whether to produce a placeholder row
+ pub produce_one_row: bool,
+ /// The schema description of the output
+ pub schema: DFSchemaRef,
+}
+
+/// Values expression. See
+/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
+/// documentation for more details.
+#[derive(Clone)]
+pub struct Values {
+ /// The table schema
+ pub schema: DFSchemaRef,
+ /// Values
+ pub values: Vec<Vec<Expr>>,
+}
+
+/// Evaluates an arbitrary list of expressions (essentially a
+/// SELECT with an expression list) on its input.
+#[derive(Clone)]
+pub struct Projection {
+ /// The list of expressions
+ pub expr: Vec<Expr>,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The schema description of the output
+ pub schema: DFSchemaRef,
+ /// Projection output relation alias
+ pub alias: Option<String>,
+}
+
+/// Aliased subquery
+#[derive(Clone)]
+pub struct SubqueryAlias {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The alias for the input relation
+ pub alias: String,
+ /// The schema with qualified field names
+ pub schema: DFSchemaRef,
+}
+
+/// Filters rows from its input that do not match an
+/// expression (essentially a WHERE clause with a predicate
+/// expression).
+///
+/// Semantically, `<predicate>` is evaluated for each row of the input;
+/// If the value of `<predicate>` is true, the input row is passed to
+/// the output. If the value of `<predicate>` is false, the row is
+/// discarded.
+#[derive(Clone)]
+pub struct Filter {
+ /// The predicate expression, which must have Boolean type.
+ pub predicate: Expr,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+}
- let mut visitor = GraphvizVisitor::new(f);
+/// Window its input based on a set of window spec and window function (e.g. SUM or RANK)
+#[derive(Clone)]
+pub struct Window {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The window function expression
+ pub window_expr: Vec<Expr>,
+ /// The schema description of the window output
+ pub schema: DFSchemaRef,
+}
- visitor.pre_visit_plan("LogicalPlan")?;
- self.0.accept(&mut visitor).unwrap();
- visitor.post_visit_plan()?;
+/// Produces rows from a table provider by reference or from the context
+#[derive(Clone)]
+pub struct TableScan {
+ /// The name of the table
+ pub table_name: String,
+ /// The source of the table
+ pub source: Arc<dyn TableSource>,
+ /// Optional column indices to use as a projection
+ pub projection: Option<Vec<usize>>,
+ /// The schema description of the output
+ pub projected_schema: DFSchemaRef,
+ /// Optional expressions to be used as filters by the table provider
+ pub filters: Vec<Expr>,
+ /// Optional limit to skip reading
+ pub limit: Option<usize>,
+}
- visitor.set_with_schema(true);
- visitor.pre_visit_plan("Detailed LogicalPlan")?;
- self.0.accept(&mut visitor).unwrap();
- visitor.post_visit_plan()?;
+/// Apply Cross Join to two logical plans
+#[derive(Clone)]
+pub struct CrossJoin {
+ /// Left input
+ pub left: Arc<LogicalPlan>,
+ /// Right input
+ pub right: Arc<LogicalPlan>,
+ /// The output schema, containing fields from the left and right inputs
+ pub schema: DFSchemaRef,
+}
- writeln!(f, "}}")?;
- writeln!(f, "// End DataFusion GraphViz Plan")?;
- Ok(())
- }
- }
- Wrapper(self)
- }
+/// Repartition the plan based on a partitioning scheme.
+#[derive(Clone)]
+pub struct Repartition {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// The partitioning scheme
+ pub partitioning_scheme: Partitioning,
+}
- /// Return a `format`able structure with the a human readable
- /// description of this LogicalPlan node per node, not including
- /// children. For example:
- ///
- /// ```text
- /// Projection: #id
- /// ```
- /// ```
- /// use arrow::datatypes::{Field, Schema, DataType};
- /// use datafusion::logical_plan::{lit, col, LogicalPlanBuilder};
- /// let schema = Schema::new(vec![
- /// Field::new("id", DataType::Int32, false),
- /// ]);
- /// let plan = LogicalPlanBuilder::scan_empty(Some("foo.csv"), &schema, None).unwrap()
- /// .build().unwrap();
- ///
- /// // Format using display
- /// let display_string = format!("{}", plan.display());
- ///
- /// assert_eq!("TableScan: foo.csv projection=None", display_string);
- /// ```
- pub fn display(&self) -> impl fmt::Display + '_ {
- // Boilerplate structure to wrap LogicalPlan with something
- // that that can be formatted
- struct Wrapper<'a>(&'a LogicalPlan);
- impl<'a> fmt::Display for Wrapper<'a> {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- match &*self.0 {
- LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
- LogicalPlan::Values(Values { ref values, .. }) => {
- let str_values: Vec<_> = values
- .iter()
- // limit to only 5 values to avoid horrible display
- .take(5)
- .map(|row| {
- let item = row
- .iter()
- .map(|expr| expr.to_string())
- .collect::<Vec<_>>()
- .join(", ");
- format!("({})", item)
- })
- .collect();
+/// Union multiple inputs
+#[derive(Clone)]
+pub struct Union {
+ /// Inputs to merge
+ pub inputs: Vec<LogicalPlan>,
+ /// Union schema. Should be the same for all inputs.
+ pub schema: DFSchemaRef,
+ /// Union output relation alias
+ pub alias: Option<String>,
+}
- let elipse = if values.len() > 5 { "..." } else { "" };
- write!(f, "Values: {}{}", str_values.join(", "), elipse)
- }
+/// Creates an in memory table.
+#[derive(Clone)]
+pub struct CreateMemoryTable {
+ /// The table name
+ pub name: String,
+ /// The logical plan
+ pub input: Arc<LogicalPlan>,
+ /// Option to not error if table already exists
+ pub if_not_exists: bool,
+}
- LogicalPlan::TableScan(TableScan {
- ref source,
- ref table_name,
- ref projection,
- ref filters,
- ref limit,
- ..
- }) => {
- write!(
- f,
- "TableScan: {} projection={:?}",
- table_name, projection
- )?;
+/// Types of files to parse as DataFrames
+#[derive(Debug, Clone, Copy, PartialEq)]
+pub enum FileType {
+ /// Newline-delimited JSON
+ NdJson,
+ /// Apache Parquet columnar storage
+ Parquet,
+ /// Comma separated values
+ CSV,
+ /// Avro binary records
+ Avro,
+}
- if !filters.is_empty() {
- let mut full_filter = vec![];
- let mut partial_filter = vec![];
- let mut unsupported_filters = vec![];
+/// Creates an external table.
+#[derive(Clone)]
+pub struct CreateExternalTable {
+ /// The table schema
+ pub schema: DFSchemaRef,
+ /// The table name
+ pub name: String,
+ /// The physical location
+ pub location: String,
+ /// The file type of physical file
+ pub file_type: FileType,
+ /// Whether the CSV file contains a header
+ pub has_header: bool,
+ /// Delimiter for CSV
+ pub delimiter: char,
+ /// Partition Columns
+ pub table_partition_cols: Vec<String>,
+ /// Option to not error if table already exists
+ pub if_not_exists: bool,
+}
- filters.iter().for_each(|x| {
- if let Ok(t) = source.supports_filter_pushdown(x) {
- match t {
- TableProviderFilterPushDown::Exact => {
- full_filter.push(x)
- }
- TableProviderFilterPushDown::Inexact => {
- partial_filter.push(x)
- }
- TableProviderFilterPushDown::Unsupported => {
- unsupported_filters.push(x)
- }
- }
- }
- });
+/// Produces a relation with string representations of
+/// various parts of the plan
+#[derive(Clone)]
+pub struct Explain {
+ /// Should extra (detailed, intermediate plans) be included?
+ pub verbose: bool,
+ /// The logical plan that is being EXPLAIN'd
+ pub plan: Arc<LogicalPlan>,
+ /// Represent the various stages plans have gone through
+ pub stringified_plans: Vec<StringifiedPlan>,
+ /// The output schema of the explain (2 columns of text)
+ pub schema: DFSchemaRef,
+}
- if !full_filter.is_empty() {
- write!(f, ", full_filters={:?}", full_filter)?;
- };
- if !partial_filter.is_empty() {
- write!(f, ", partial_filters={:?}", partial_filter)?;
- }
- if !unsupported_filters.is_empty() {
- write!(
- f,
- ", unsupported_filters={:?}",
- unsupported_filters
- )?;
- }
- }
+/// Runs the actual plan, and then prints the physical plan with
+/// with execution metrics.
+#[derive(Clone)]
+pub struct Analyze {
+ /// Should extra detail be included?
+ pub verbose: bool,
+ /// The logical plan that is being EXPLAIN ANALYZE'd
+ pub input: Arc<LogicalPlan>,
+ /// The output schema of the explain (2 columns of text)
+ pub schema: DFSchemaRef,
+}
+
+/// Extension operator defined outside of DataFusion
+#[derive(Clone)]
+pub struct Extension {
+ /// The runtime extension operator
+ pub node: Arc<dyn UserDefinedLogicalNode + Send + Sync>,
+}
+
+/// Produces the first `n` tuples from its input and discards the rest.
+#[derive(Clone)]
+pub struct Limit {
+ /// The limit
+ pub n: usize,
+ /// The logical plan
+ pub input: Arc<LogicalPlan>,
+}
+
+/// Aggregates its input based on a set of grouping and aggregate
+/// expressions (e.g. SUM).
+#[derive(Clone)]
+pub struct Aggregate {
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+ /// Grouping expressions
+ pub group_expr: Vec<Expr>,
+ /// Aggregate expressions
+ pub aggr_expr: Vec<Expr>,
+ /// The schema description of the aggregate output
+ pub schema: DFSchemaRef,
+}
- if let Some(n) = limit {
- write!(f, ", limit={}", n)?;
- }
+/// Sorts its input according to a list of sort expressions.
+#[derive(Clone)]
+pub struct Sort {
+ /// The sort expressions
+ pub expr: Vec<Expr>,
+ /// The incoming logical plan
+ pub input: Arc<LogicalPlan>,
+}
- Ok(())
- }
- LogicalPlan::Projection(Projection {
- ref expr, alias, ..
- }) => {
- write!(f, "Projection: ")?;
- for (i, expr_item) in expr.iter().enumerate() {
- if i > 0 {
- write!(f, ", ")?;
- }
- write!(f, "{:?}", expr_item)?;
- }
- if let Some(a) = alias {
- write!(f, ", alias={}", a)?;
- }
- Ok(())
- }
- LogicalPlan::Filter(Filter {
- predicate: ref expr,
- ..
- }) => write!(f, "Filter: {:?}", expr),
- LogicalPlan::Window(Window {
- ref window_expr, ..
- }) => {
- write!(f, "WindowAggr: windowExpr=[{:?}]", window_expr)
- }
- LogicalPlan::Aggregate(Aggregate {
- ref group_expr,
- ref aggr_expr,
- ..
- }) => write!(
- f,
- "Aggregate: groupBy=[{:?}], aggr=[{:?}]",
- group_expr, aggr_expr
- ),
- LogicalPlan::Sort(Sort { expr, .. }) => {
- write!(f, "Sort: ")?;
- for (i, expr_item) in expr.iter().enumerate() {
- if i > 0 {
- write!(f, ", ")?;
- }
- write!(f, "{:?}", expr_item)?;
- }
- Ok(())
- }
- LogicalPlan::Join(Join {
- on: ref keys,
- join_constraint,
- join_type,
- ..
- }) => {
- let join_expr: Vec<String> =
- keys.iter().map(|(l, r)| format!("{} = {}", l, r)).collect();
- match join_constraint {
- JoinConstraint::On => {
- write!(f, "{} Join: {}", join_type, join_expr.join(", "))
- }
- JoinConstraint::Using => {
- write!(
- f,
- "{} Join: Using {}",
- join_type,
- join_expr.join(", ")
- )
- }
- }
- }
- LogicalPlan::CrossJoin(_) => {
- write!(f, "CrossJoin:")
- }
- LogicalPlan::Repartition(Repartition {
- partitioning_scheme,
- ..
- }) => match partitioning_scheme {
- Partitioning::RoundRobinBatch(n) => write!(
- f,
- "Repartition: RoundRobinBatch partition_count={}",
- n
- ),
- Partitioning::Hash(expr, n) => {
- let hash_expr: Vec<String> =
- expr.iter().map(|e| format!("{:?}", e)).collect();
- write!(
- f,
- "Repartition: Hash({}) partition_count={}",
- hash_expr.join(", "),
- n
- )
- }
- },
- LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
- LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
- write!(f, "SubqueryAlias: {}", alias)
- }
- LogicalPlan::CreateExternalTable(CreateExternalTable {
- ref name,
- ..
- }) => {
- write!(f, "CreateExternalTable: {:?}", name)
- }
- LogicalPlan::CreateMemoryTable(CreateMemoryTable {
- name, ..
- }) => {
- write!(f, "CreateMemoryTable: {:?}", name)
- }
- LogicalPlan::CreateCatalogSchema(CreateCatalogSchema {
- schema_name,
- ..
- }) => {
- write!(f, "CreateCatalogSchema: {:?}", schema_name)
- }
- LogicalPlan::CreateCatalog(CreateCatalog {
- catalog_name, ..
- }) => {
- write!(f, "CreateCatalog: {:?}", catalog_name)
- }
- LogicalPlan::DropTable(DropTable {
- name, if_exists, ..
- }) => {
- write!(f, "DropTable: {:?} if not exist:={}", name, if_exists)
- }
- LogicalPlan::Explain { .. } => write!(f, "Explain"),
- LogicalPlan::Analyze { .. } => write!(f, "Analyze"),
- LogicalPlan::Union(_) => write!(f, "Union"),
- LogicalPlan::Extension(e) => e.node.fmt_for_explain(f),
- }
- }
- }
- Wrapper(self)
- }
+/// Join two logical plans on one or more join columns
+#[derive(Clone)]
+pub struct Join {
+ /// Left input
+ pub left: Arc<LogicalPlan>,
+ /// Right input
+ pub right: Arc<LogicalPlan>,
+ /// Equijoin clause expressed as pairs of (left, right) join columns
+ pub on: Vec<(Column, Column)>,
+ /// Join type
+ pub join_type: JoinType,
+ /// Join constraint
+ pub join_constraint: JoinConstraint,
+ /// The output schema, containing fields from the left and right inputs
+ pub schema: DFSchemaRef,
+ /// If null_equals_null is true, null == null else null != null
+ pub null_equals_null: bool,
}
-impl fmt::Debug for LogicalPlan {
- fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
- self.display_indent().fmt(f)
- }
+/// Logical partitioning schemes supported by the repartition operator.
+#[derive(Debug, Clone)]
+pub enum Partitioning {
+ /// Allocate batches using a round-robin algorithm and the specified number of partitions
+ RoundRobinBatch(usize),
+ /// Allocate rows based on a hash of one of more expressions and the specified number
+ /// of partitions.
+ /// This partitioning scheme is not yet fully supported. See <https://issues.apache.org/jira/browse/ARROW-11011>
+ Hash(Vec<Expr>, usize),
}
/// Represents which type of plan, when storing multiple
@@ -1293,345 +1228,3 @@ pub trait ToStringifiedPlan {
/// Create a stringified plan with the specified type
fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan;
}
-
-impl ToStringifiedPlan for LogicalPlan {
- fn to_stringified(&self, plan_type: PlanType) -> StringifiedPlan {
- StringifiedPlan::new(plan_type, self.display_indent().to_string())
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::super::{col, lit, LogicalPlanBuilder};
- use super::*;
-
- fn employee_schema() -> Schema {
- Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("first_name", DataType::Utf8, false),
- Field::new("last_name", DataType::Utf8, false),
- Field::new("state", DataType::Utf8, false),
- Field::new("salary", DataType::Int32, false),
- ])
- }
-
- fn display_plan() -> LogicalPlan {
- LogicalPlanBuilder::scan_empty(
- Some("employee_csv"),
- &employee_schema(),
- Some(vec![0, 3]),
- )
- .unwrap()
- .filter(col("state").eq(lit("CO")))
- .unwrap()
- .project(vec![col("id")])
- .unwrap()
- .build()
- .unwrap()
- }
-
- #[test]
- fn test_display_indent() {
- let plan = display_plan();
-
- let expected = "Projection: #employee_csv.id\
- \n Filter: #employee_csv.state = Utf8(\"CO\")\
- \n TableScan: employee_csv projection=Some([0, 3])";
-
- assert_eq!(expected, format!("{}", plan.display_indent()));
- }
-
- #[test]
- fn test_display_indent_schema() {
- let plan = display_plan();
-
- let expected = "Projection: #employee_csv.id [id:Int32]\
- \n Filter: #employee_csv.state = Utf8(\"CO\") [id:Int32, state:Utf8]\
- \n TableScan: employee_csv projection=Some([0, 3]) [id:Int32, state:Utf8]";
-
- assert_eq!(expected, format!("{}", plan.display_indent_schema()));
- }
-
- #[test]
- fn test_display_graphviz() {
- let plan = display_plan();
-
- // just test for a few key lines in the output rather than the
- // whole thing to make test mainteance easier.
- let graphviz = format!("{}", plan.display_graphviz());
-
- assert!(
- graphviz.contains(
- r#"// Begin DataFusion GraphViz Plan (see https://graphviz.org)"#
- ),
- "\n{}",
- plan.display_graphviz()
- );
- assert!(
- graphviz.contains(
- r#"[shape=box label="TableScan: employee_csv projection=Some([0, 3])"]"#
- ),
- "\n{}",
- plan.display_graphviz()
- );
- assert!(graphviz.contains(r#"[shape=box label="TableScan: employee_csv projection=Some([0, 3])\nSchema: [id:Int32, state:Utf8]"]"#),
- "\n{}", plan.display_graphviz());
- assert!(
- graphviz.contains(r#"// End DataFusion GraphViz Plan"#),
- "\n{}",
- plan.display_graphviz()
- );
- }
-
- /// Tests for the Visitor trait and walking logical plan nodes
- #[derive(Debug, Default)]
- struct OkVisitor {
- strings: Vec<String>,
- }
-
- impl PlanVisitor for OkVisitor {
- type Error = String;
-
- fn pre_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- let s = match plan {
- LogicalPlan::Projection { .. } => "pre_visit Projection",
- LogicalPlan::Filter { .. } => "pre_visit Filter",
- LogicalPlan::TableScan { .. } => "pre_visit TableScan",
- _ => unimplemented!("unknown plan type"),
- };
-
- self.strings.push(s.into());
- Ok(true)
- }
-
- fn post_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- let s = match plan {
- LogicalPlan::Projection { .. } => "post_visit Projection",
- LogicalPlan::Filter { .. } => "post_visit Filter",
- LogicalPlan::TableScan { .. } => "post_visit TableScan",
- _ => unimplemented!("unknown plan type"),
- };
-
- self.strings.push(s.into());
- Ok(true)
- }
- }
-
- #[test]
- fn visit_order() {
- let mut visitor = OkVisitor::default();
- let plan = test_plan();
- let res = plan.accept(&mut visitor);
- assert!(res.is_ok());
-
- assert_eq!(
- visitor.strings,
- vec![
- "pre_visit Projection",
- "pre_visit Filter",
- "pre_visit TableScan",
- "post_visit TableScan",
- "post_visit Filter",
- "post_visit Projection",
- ]
- );
- }
-
- #[derive(Debug, Default)]
- /// Counter than counts to zero and returns true when it gets there
- struct OptionalCounter {
- val: Option<usize>,
- }
-
- impl OptionalCounter {
- fn new(val: usize) -> Self {
- Self { val: Some(val) }
- }
- // Decrements the counter by 1, if any, returning true if it hits zero
- fn dec(&mut self) -> bool {
- if Some(0) == self.val {
- true
- } else {
- self.val = self.val.take().map(|i| i - 1);
- false
- }
- }
- }
-
- #[derive(Debug, Default)]
- /// Visitor that returns false after some number of visits
- struct StoppingVisitor {
- inner: OkVisitor,
- /// When Some(0) returns false from pre_visit
- return_false_from_pre_in: OptionalCounter,
- /// When Some(0) returns false from post_visit
- return_false_from_post_in: OptionalCounter,
- }
-
- impl PlanVisitor for StoppingVisitor {
- type Error = String;
-
- fn pre_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if self.return_false_from_pre_in.dec() {
- return Ok(false);
- }
- self.inner.pre_visit(plan)
- }
-
- fn post_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if self.return_false_from_post_in.dec() {
- return Ok(false);
- }
-
- self.inner.post_visit(plan)
- }
- }
-
- /// test early stopping in pre-visit
- #[test]
- fn early_stopping_pre_visit() {
- let mut visitor = StoppingVisitor {
- return_false_from_pre_in: OptionalCounter::new(2),
- ..Default::default()
- };
- let plan = test_plan();
- let res = plan.accept(&mut visitor);
- assert!(res.is_ok());
-
- assert_eq!(
- visitor.inner.strings,
- vec!["pre_visit Projection", "pre_visit Filter"]
- );
- }
-
- #[test]
- fn early_stopping_post_visit() {
- let mut visitor = StoppingVisitor {
- return_false_from_post_in: OptionalCounter::new(1),
- ..Default::default()
- };
- let plan = test_plan();
- let res = plan.accept(&mut visitor);
- assert!(res.is_ok());
-
- assert_eq!(
- visitor.inner.strings,
- vec![
- "pre_visit Projection",
- "pre_visit Filter",
- "pre_visit TableScan",
- "post_visit TableScan",
- ]
- );
- }
-
- #[derive(Debug, Default)]
- /// Visitor that returns an error after some number of visits
- struct ErrorVisitor {
- inner: OkVisitor,
- /// When Some(0) returns false from pre_visit
- return_error_from_pre_in: OptionalCounter,
- /// When Some(0) returns false from post_visit
- return_error_from_post_in: OptionalCounter,
- }
-
- impl PlanVisitor for ErrorVisitor {
- type Error = String;
-
- fn pre_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if self.return_error_from_pre_in.dec() {
- return Err("Error in pre_visit".into());
- }
-
- self.inner.pre_visit(plan)
- }
-
- fn post_visit(
- &mut self,
- plan: &LogicalPlan,
- ) -> std::result::Result<bool, Self::Error> {
- if self.return_error_from_post_in.dec() {
- return Err("Error in post_visit".into());
- }
-
- self.inner.post_visit(plan)
- }
- }
-
- #[test]
- fn error_pre_visit() {
- let mut visitor = ErrorVisitor {
- return_error_from_pre_in: OptionalCounter::new(2),
- ..Default::default()
- };
- let plan = test_plan();
- let res = plan.accept(&mut visitor);
-
- if let Err(e) = res {
- assert_eq!("Error in pre_visit", e);
- } else {
- panic!("Expected an error");
- }
-
- assert_eq!(
- visitor.inner.strings,
- vec!["pre_visit Projection", "pre_visit Filter"]
- );
- }
-
- #[test]
- fn error_post_visit() {
- let mut visitor = ErrorVisitor {
- return_error_from_post_in: OptionalCounter::new(1),
- ..Default::default()
- };
- let plan = test_plan();
- let res = plan.accept(&mut visitor);
- if let Err(e) = res {
- assert_eq!("Error in post_visit", e);
- } else {
- panic!("Expected an error");
- }
-
- assert_eq!(
- visitor.inner.strings,
- vec![
- "pre_visit Projection",
- "pre_visit Filter",
- "pre_visit TableScan",
- "post_visit TableScan",
- ]
- );
- }
-
- fn test_plan() -> LogicalPlan {
- let schema = Schema::new(vec![
- Field::new("id", DataType::Int32, false),
- Field::new("state", DataType::Utf8, false),
- ]);
-
- LogicalPlanBuilder::scan_empty(None, &schema, Some(vec![0, 1]))
- .unwrap()
- .filter(col("state").eq(lit("CO")))
- .unwrap()
- .project(vec![col("id")])
- .unwrap()
- .build()
- .unwrap()
- }
-}