You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/26 02:25:37 UTC
[arrow-datafusion] branch master updated: Add `Expr::Exists` to represent EXISTS subquery expression (#2339)
This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 6ae7d9599 Add `Expr::Exists` to represent EXISTS subquery expression (#2339)
6ae7d9599 is described below
commit 6ae7d9599813b3aaf72b22a0d18f4d27bef0f730
Author: Andy Grove <ag...@apache.org>
AuthorDate: Mon Apr 25 20:25:33 2022 -0600
Add `Expr::Exists` to represent EXISTS subquery expression (#2339)
---
ballista/rust/core/src/serde/logical_plan/mod.rs | 8 ++++
datafusion/core/src/datasource/listing/helpers.rs | 1 +
datafusion/core/src/logical_plan/builder.rs | 27 +++++++++++++
datafusion/core/src/logical_plan/expr_rewriter.rs | 1 +
datafusion/core/src/logical_plan/expr_visitor.rs | 1 +
datafusion/core/src/logical_plan/mod.rs | 18 ++++-----
datafusion/core/src/logical_plan/plan.rs | 2 +-
.../core/src/optimizer/common_subexpr_eliminate.rs | 4 ++
.../core/src/optimizer/projection_push_down.rs | 1 +
.../core/src/optimizer/simplify_expressions.rs | 1 +
datafusion/core/src/optimizer/utils.rs | 14 ++++++-
datafusion/core/src/physical_plan/planner.rs | 4 ++
datafusion/core/src/prelude.rs | 2 +-
datafusion/core/src/sql/utils.rs | 7 ++--
datafusion/expr/src/expr.rs | 5 +++
datafusion/expr/src/expr_fn.rs | 9 ++++-
datafusion/expr/src/expr_schema.rs | 3 +-
datafusion/expr/src/lib.rs | 10 ++---
datafusion/expr/src/logical_plan/mod.rs | 2 +-
datafusion/expr/src/logical_plan/plan.rs | 47 +++++++++++++++++++++-
20 files changed, 142 insertions(+), 25 deletions(-)
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index b9ab11704..5307aff65 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -708,6 +708,14 @@ impl AsLogicalPlan for LogicalPlanNode {
))),
})
}
+ LogicalPlan::Subquery(_) => {
+ // note that the ballista and datafusion proto files need refactoring to allow
+ // LogicalExprNode to reference a LogicalPlanNode
+ // see https://github.com/apache/arrow-datafusion/issues/2338
+ Err(BallistaError::NotImplemented(
+ "Ballista does not support subqueries".to_string(),
+ ))
+ }
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
let input: protobuf::LogicalPlanNode =
protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 362a8545b..e36616965 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -92,6 +92,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
| Expr::BinaryExpr { .. }
| Expr::Between { .. }
| Expr::InList { .. }
+ | Expr::Exists { .. }
| Expr::GetIndexedField { .. }
| Expr::Case { .. } => Recursion::Continue(self),
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index a0914ae73..c303cdec8 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -1202,8 +1202,10 @@ pub(crate) fn expand_qualified_wildcard(
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field};
+ use datafusion_expr::expr_fn::exists;
use crate::logical_plan::StringifiedPlan;
+ use crate::test::test_table_scan_with_name;
use super::super::{col, lit, sum};
use super::*;
@@ -1339,6 +1341,31 @@ mod tests {
Ok(())
}
+ #[test]
+ fn exists_subquery() -> Result<()> {
+ let foo = test_table_scan_with_name("foo")?;
+ let bar = test_table_scan_with_name("bar")?;
+
+ let subquery = LogicalPlanBuilder::from(foo)
+ .project(vec![col("a")])?
+ .filter(col("a").eq(col("bar.a")))?
+ .build()?;
+
+ let outer_query = LogicalPlanBuilder::from(bar)
+ .project(vec![col("a")])?
+ .filter(exists(Arc::new(subquery)))?
+ .build()?;
+
+ let expected = "Filter: EXISTS (\
+ Subquery: Filter: #foo.a = #bar.a\
+ \n Projection: #foo.a\
+ \n TableScan: foo projection=None)\
+ \n Projection: #bar.a\n TableScan: bar projection=None";
+ assert_eq!(expected, format!("{:?}", outer_query));
+
+ Ok(())
+ }
+
#[test]
fn projection_non_unique_names() -> Result<()> {
let plan = LogicalPlanBuilder::scan_empty(
diff --git a/datafusion/core/src/logical_plan/expr_rewriter.rs b/datafusion/core/src/logical_plan/expr_rewriter.rs
index b8afa8a36..e99fc7e66 100644
--- a/datafusion/core/src/logical_plan/expr_rewriter.rs
+++ b/datafusion/core/src/logical_plan/expr_rewriter.rs
@@ -111,6 +111,7 @@ impl ExprRewritable for Expr {
let expr = match self {
Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
Expr::Column(_) => self.clone(),
+ Expr::Exists(_) => self.clone(),
Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
Expr::Literal(value) => Expr::Literal(value),
Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
diff --git a/datafusion/core/src/logical_plan/expr_visitor.rs b/datafusion/core/src/logical_plan/expr_visitor.rs
index 27fa9d3c7..bfab0ca04 100644
--- a/datafusion/core/src/logical_plan/expr_visitor.rs
+++ b/datafusion/core/src/logical_plan/expr_visitor.rs
@@ -106,6 +106,7 @@ impl ExprVisitable for Expr {
Expr::Column(_)
| Expr::ScalarVariable(_, _)
| Expr::Literal(_)
+ | Expr::Exists(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. } => Ok(visitor),
Expr::BinaryExpr { left, right, .. } => {
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index f131d3389..d933b0229 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -40,14 +40,14 @@ pub use expr::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
- count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp,
- exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano,
- ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif,
- octet_length, or, random, regexp_match, regexp_replace, repeat, replace, reverse,
- right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part,
- sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
- to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper,
- when, Column, Expr, ExprSchema, Literal,
+ count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
+ exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit,
+ lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now,
+ now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat,
+ replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum,
+ sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
+ to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
+ trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
};
pub use expr_rewriter::{
normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
@@ -60,6 +60,6 @@ pub use plan::{
CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
- TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
+ Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
};
pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 89047af44..08d1fa120 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -28,7 +28,7 @@ pub use crate::logical_expr::{
CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension,
FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
- StringifiedPlan, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+ StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
UserDefinedLogicalNode, Values, Window,
},
TableProviderFilterPushDown, TableSource,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 39964df4a..4a9bf8e91 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
| LogicalPlan::TableScan { .. }
| LogicalPlan::Values(_)
| LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
@@ -459,6 +460,9 @@ impl ExprIdentifierVisitor<'_> {
desc.push_str("InList-");
desc.push_str(&negated.to_string());
}
+ Expr::Exists(_) => {
+ desc.push_str("Exists-");
+ }
Expr::Wildcard => {
desc.push_str("Wildcard-");
}
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 10bf5d10f..5062082e8 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -466,6 +466,7 @@ fn optimize_plan(
| LogicalPlan::Filter { .. }
| LogicalPlan::Repartition(_)
| LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Subquery(_)
| LogicalPlan::Values(_)
| LogicalPlan::Sort { .. }
| LogicalPlan::CreateExternalTable(_)
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs
index 394ab68bd..93d9fb506 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -375,6 +375,7 @@ impl<'a> ConstEvaluator<'a> {
| Expr::AggregateUDF { .. }
| Expr::ScalarVariable(_, _)
| Expr::Column(_)
+ | Expr::Exists(_)
| Expr::WindowFunction { .. }
| Expr::Sort { .. }
| Expr::Wildcard
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 0dab2d3ed..939af8041 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -19,8 +19,9 @@
use super::optimizer::OptimizerRule;
use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{
- Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
+use datafusion_expr::logical_plan::{
+ Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery,
+ SubqueryAlias, Window,
};
use crate::logical_plan::{
@@ -84,6 +85,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. }
| Expr::InList { .. }
+ | Expr::Exists(_)
| Expr::Wildcard
| Expr::QualifiedWildcard { .. }
| Expr::GetIndexedField { .. } => {}
@@ -223,6 +225,12 @@ pub fn from_plan(
let right = &inputs[1];
LogicalPlanBuilder::from(left).cross_join(right)?.build()
}
+ LogicalPlan::Subquery(_) => {
+ let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+ Ok(LogicalPlan::Subquery(Subquery {
+ subquery: Arc::new(subquery),
+ }))
+ }
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
let schema = inputs[0].schema().as_ref().clone().into();
let schema =
@@ -363,6 +371,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
}
Ok(expr_list)
}
+ Expr::Exists(_) => Ok(vec![]),
Expr::Wildcard { .. } => Err(DataFusionError::Internal(
"Wildcard expressions are not valid in a logical query plan".to_owned(),
)),
@@ -497,6 +506,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
Expr::Column(_)
| Expr::Literal(_)
| Expr::InList { .. }
+ | Expr::Exists(_)
| Expr::ScalarVariable(_, _) => Ok(expr.clone()),
Expr::Sort {
asc, nulls_first, ..
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 5b34a65dc..84785777b 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -186,6 +186,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
Ok(format!("{} IN ({:?})", expr, list))
}
}
+ Expr::Exists(_) => Err(DataFusionError::NotImplemented(
+ "EXISTS is not yet supported in the physical plan".to_string(),
+ )),
Expr::Between {
expr,
negated,
@@ -780,6 +783,7 @@ impl DefaultPhysicalPlanner {
let right = self.create_initial_plan(right, session_state).await?;
Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
}
+ LogicalPlan::Subquery(_) => todo!(),
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row,
schema,
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index 18343bce7..e0c418417 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -33,7 +33,7 @@ pub use crate::execution::options::{
pub use crate::logical_plan::{
approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
- in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
+ exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right,
rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr,
sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs
index 5e090c99b..f9242c6aa 100644
--- a/datafusion/core/src/sql/utils.rs
+++ b/datafusion/core/src/sql/utils.rs
@@ -368,9 +368,10 @@ where
asc: *asc,
nulls_first: *nulls_first,
}),
- Expr::Column { .. } | Expr::Literal(_) | Expr::ScalarVariable(_, _) => {
- Ok(expr.clone())
- }
+ Expr::Column { .. }
+ | Expr::Literal(_)
+ | Expr::ScalarVariable(_, _)
+ | Expr::Exists(_) => Ok(expr.clone()),
Expr::Wildcard => Ok(Expr::Wildcard),
Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 4bad6e31f..88c489670 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -20,6 +20,7 @@
use crate::aggregate_function;
use crate::built_in_function;
use crate::expr_fn::binary_expr;
+use crate::logical_plan::Subquery;
use crate::window_frame;
use crate::window_function;
use crate::AggregateUDF;
@@ -226,6 +227,8 @@ pub enum Expr {
/// Whether the expression is negated
negated: bool,
},
+ /// EXISTS subquery
+ Exists(Subquery),
/// Represents a reference to all fields in a schema.
Wildcard,
/// Represents a reference to all fields in a specific schema.
@@ -431,6 +434,7 @@ impl fmt::Debug for Expr {
Expr::Negative(expr) => write!(f, "(- {:?})", expr),
Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
+ Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery),
Expr::BinaryExpr { left, op, right } => {
write!(f, "{:?} {} {:?}", left, op, right)
}
@@ -618,6 +622,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
let expr = create_name(expr, input_schema)?;
Ok(format!("{} IS NOT NULL", expr))
}
+ Expr::Exists(_) => Ok("EXISTS".to_string()),
Expr::GetIndexedField { expr, key } => {
let expr = create_name(expr, input_schema)?;
Ok(format!("{}[{}]", expr, key))
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index a723f5306..19c311f4f 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -18,7 +18,9 @@
//! Functions for creating logical expressions
use crate::conditional_expressions::CaseBuilder;
-use crate::{aggregate_function, built_in_function, lit, Expr, Operator};
+use crate::logical_plan::Subquery;
+use crate::{aggregate_function, built_in_function, lit, Expr, LogicalPlan, Operator};
+use std::sync::Arc;
/// Create a column expression based on a qualified or unqualified column name
pub fn col(ident: &str) -> Expr {
@@ -180,6 +182,11 @@ pub fn approx_percentile_cont_with_weight(
}
}
+/// Create an EXISTS subquery expression
+pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
+ Expr::Exists(Subquery { subquery })
+}
+
// TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
// varying arity functions
/// Create an convenience function representing a unary scalar function
diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs
index a216281b3..4c6457962 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -100,6 +100,7 @@ impl ExprSchemable for Expr {
}
Expr::Not(_)
| Expr::IsNull(_)
+ | Expr::Exists(_)
| Expr::Between { .. }
| Expr::InList { .. }
| Expr::IsNotNull(_) => Ok(DataType::Boolean),
@@ -172,7 +173,7 @@ impl ExprSchemable for Expr {
| Expr::WindowFunction { .. }
| Expr::AggregateFunction { .. }
| Expr::AggregateUDF { .. } => Ok(true),
- Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+ Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false),
Expr::BinaryExpr {
ref left,
ref right,
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index b1e822077..3dd24600a 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -48,11 +48,11 @@ pub use expr_fn::{
abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat,
concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part,
- date_trunc, digest, exp, floor, in_list, initcap, left, length, ln, log10, log2,
- lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, random,
- regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim,
- sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
- substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
+ date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10,
+ log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or,
+ random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad,
+ rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with,
+ strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
to_timestamp_seconds, translate, trim, trunc, upper, when,
};
pub use expr_schema::ExprSchemable;
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 525740274..a37729f7d 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,7 +23,7 @@ pub use plan::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType,
Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
- PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, SubqueryAlias,
+ PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
TableScan, ToStringifiedPlan, Union, Values, Window,
};
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 8fefac67d..579898dbe 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -22,7 +22,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::{Column, DFSchemaRef, DataFusionError};
use std::collections::HashSet;
///! Logical plan types
-use std::fmt::{self, Display, Formatter};
+use std::fmt::{self, Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
use std::sync::Arc;
/// A LogicalPlan represents the different types of relational
@@ -66,6 +67,8 @@ pub enum LogicalPlan {
TableScan(TableScan),
/// Produces no rows: An empty relation with an empty schema
EmptyRelation(EmptyRelation),
+ /// Subquery
+ Subquery(Subquery),
/// Aliased relation provides, or changes, the name of a relation.
SubqueryAlias(SubqueryAlias),
/// Produces the first `n` tuples from its input and discards the rest.
@@ -112,6 +115,7 @@ impl LogicalPlan {
LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+ LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
schema
@@ -161,6 +165,7 @@ impl LogicalPlan {
schemas.insert(0, schema);
schemas
}
+ LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.all_schemas(),
LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
vec![schema]
}
@@ -225,6 +230,7 @@ impl LogicalPlan {
// plans without expressions
LogicalPlan::TableScan { .. }
| LogicalPlan::EmptyRelation(_)
+ | LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
| LogicalPlan::CreateExternalTable(_)
@@ -254,6 +260,7 @@ impl LogicalPlan {
LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+ LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
LogicalPlan::Extension(extension) => extension.node.inputs(),
LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
@@ -392,6 +399,9 @@ impl LogicalPlan {
true
}
LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+ LogicalPlan::Subquery(Subquery { subquery, .. }) => {
+ subquery.accept(visitor)?
+ }
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
input.accept(visitor)?
}
@@ -766,6 +776,9 @@ impl LogicalPlan {
}
},
LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+ LogicalPlan::Subquery(Subquery { subquery, .. }) => {
+ write!(f, "Subquery: {:?}", subquery)
+ }
LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
write!(f, "SubqueryAlias: {}", alias)
}
@@ -1141,6 +1154,38 @@ pub struct Join {
pub null_equals_null: bool,
}
+/// Subquery
+#[derive(Clone)]
+pub struct Subquery {
+ /// The subquery
+ pub subquery: Arc<LogicalPlan>,
+}
+
+impl Debug for Subquery {
+ fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+ write!(f, "Subquery: {:?}", self.subquery)
+ }
+}
+
+impl Hash for Subquery {
+ fn hash<H: Hasher>(&self, state: &mut H) {
+ state.finish();
+ }
+
+ fn hash_slice<H: Hasher>(_data: &[Self], state: &mut H)
+ where
+ Self: Sized,
+ {
+ state.finish();
+ }
+}
+
+impl PartialEq for Subquery {
+ fn eq(&self, _other: &Self) -> bool {
+ false
+ }
+}
+
/// Logical partitioning schemes supported by the repartition operator.
#[derive(Debug, Clone)]
pub enum Partitioning {