You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ag...@apache.org on 2022/04/26 02:25:37 UTC

[arrow-datafusion] branch master updated: Add `Expr::Exists` to represent EXISTS subquery expression (#2339)

This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ae7d9599 Add `Expr::Exists` to represent EXISTS subquery expression (#2339)
6ae7d9599 is described below

commit 6ae7d9599813b3aaf72b22a0d18f4d27bef0f730
Author: Andy Grove <ag...@apache.org>
AuthorDate: Mon Apr 25 20:25:33 2022 -0600

    Add `Expr::Exists` to represent EXISTS subquery expression (#2339)
---
 ballista/rust/core/src/serde/logical_plan/mod.rs   |  8 ++++
 datafusion/core/src/datasource/listing/helpers.rs  |  1 +
 datafusion/core/src/logical_plan/builder.rs        | 27 +++++++++++++
 datafusion/core/src/logical_plan/expr_rewriter.rs  |  1 +
 datafusion/core/src/logical_plan/expr_visitor.rs   |  1 +
 datafusion/core/src/logical_plan/mod.rs            | 18 ++++-----
 datafusion/core/src/logical_plan/plan.rs           |  2 +-
 .../core/src/optimizer/common_subexpr_eliminate.rs |  4 ++
 .../core/src/optimizer/projection_push_down.rs     |  1 +
 .../core/src/optimizer/simplify_expressions.rs     |  1 +
 datafusion/core/src/optimizer/utils.rs             | 14 ++++++-
 datafusion/core/src/physical_plan/planner.rs       |  4 ++
 datafusion/core/src/prelude.rs                     |  2 +-
 datafusion/core/src/sql/utils.rs                   |  7 ++--
 datafusion/expr/src/expr.rs                        |  5 +++
 datafusion/expr/src/expr_fn.rs                     |  9 ++++-
 datafusion/expr/src/expr_schema.rs                 |  3 +-
 datafusion/expr/src/lib.rs                         | 10 ++---
 datafusion/expr/src/logical_plan/mod.rs            |  2 +-
 datafusion/expr/src/logical_plan/plan.rs           | 47 +++++++++++++++++++++-
 20 files changed, 142 insertions(+), 25 deletions(-)

diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs b/ballista/rust/core/src/serde/logical_plan/mod.rs
index b9ab11704..5307aff65 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -708,6 +708,14 @@ impl AsLogicalPlan for LogicalPlanNode {
                     ))),
                 })
             }
+            LogicalPlan::Subquery(_) => {
+                // note that the ballista and datafusion proto files need refactoring to allow
+                // LogicalExprNode to reference a LogicalPlanNode
+                // see https://github.com/apache/arrow-datafusion/issues/2338
+                Err(BallistaError::NotImplemented(
+                    "Ballista does not support subqueries".to_string(),
+                ))
+            }
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
                 let input: protobuf::LogicalPlanNode =
                     protobuf::LogicalPlanNode::try_from_logical_plan(
diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs
index 362a8545b..e36616965 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -92,6 +92,7 @@ impl ExpressionVisitor for ApplicabilityVisitor<'_> {
             | Expr::BinaryExpr { .. }
             | Expr::Between { .. }
             | Expr::InList { .. }
+            | Expr::Exists { .. }
             | Expr::GetIndexedField { .. }
             | Expr::Case { .. } => Recursion::Continue(self),
 
diff --git a/datafusion/core/src/logical_plan/builder.rs b/datafusion/core/src/logical_plan/builder.rs
index a0914ae73..c303cdec8 100644
--- a/datafusion/core/src/logical_plan/builder.rs
+++ b/datafusion/core/src/logical_plan/builder.rs
@@ -1202,8 +1202,10 @@ pub(crate) fn expand_qualified_wildcard(
 #[cfg(test)]
 mod tests {
     use arrow::datatypes::{DataType, Field};
+    use datafusion_expr::expr_fn::exists;
 
     use crate::logical_plan::StringifiedPlan;
+    use crate::test::test_table_scan_with_name;
 
     use super::super::{col, lit, sum};
     use super::*;
@@ -1339,6 +1341,31 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn exists_subquery() -> Result<()> {
+        let foo = test_table_scan_with_name("foo")?;
+        let bar = test_table_scan_with_name("bar")?;
+
+        let subquery = LogicalPlanBuilder::from(foo)
+            .project(vec![col("a")])?
+            .filter(col("a").eq(col("bar.a")))?
+            .build()?;
+
+        let outer_query = LogicalPlanBuilder::from(bar)
+            .project(vec![col("a")])?
+            .filter(exists(Arc::new(subquery)))?
+            .build()?;
+
+        let expected = "Filter: EXISTS (\
+            Subquery: Filter: #foo.a = #bar.a\
+            \n  Projection: #foo.a\
+            \n    TableScan: foo projection=None)\
+        \n  Projection: #bar.a\n    TableScan: bar projection=None";
+        assert_eq!(expected, format!("{:?}", outer_query));
+
+        Ok(())
+    }
+
     #[test]
     fn projection_non_unique_names() -> Result<()> {
         let plan = LogicalPlanBuilder::scan_empty(
diff --git a/datafusion/core/src/logical_plan/expr_rewriter.rs b/datafusion/core/src/logical_plan/expr_rewriter.rs
index b8afa8a36..e99fc7e66 100644
--- a/datafusion/core/src/logical_plan/expr_rewriter.rs
+++ b/datafusion/core/src/logical_plan/expr_rewriter.rs
@@ -111,6 +111,7 @@ impl ExprRewritable for Expr {
         let expr = match self {
             Expr::Alias(expr, name) => Expr::Alias(rewrite_boxed(expr, rewriter)?, name),
             Expr::Column(_) => self.clone(),
+            Expr::Exists(_) => self.clone(),
             Expr::ScalarVariable(ty, names) => Expr::ScalarVariable(ty, names),
             Expr::Literal(value) => Expr::Literal(value),
             Expr::BinaryExpr { left, op, right } => Expr::BinaryExpr {
diff --git a/datafusion/core/src/logical_plan/expr_visitor.rs b/datafusion/core/src/logical_plan/expr_visitor.rs
index 27fa9d3c7..bfab0ca04 100644
--- a/datafusion/core/src/logical_plan/expr_visitor.rs
+++ b/datafusion/core/src/logical_plan/expr_visitor.rs
@@ -106,6 +106,7 @@ impl ExprVisitable for Expr {
             Expr::Column(_)
             | Expr::ScalarVariable(_, _)
             | Expr::Literal(_)
+            | Expr::Exists(_)
             | Expr::Wildcard
             | Expr::QualifiedWildcard { .. } => Ok(visitor),
             Expr::BinaryExpr { left, right, .. } => {
diff --git a/datafusion/core/src/logical_plan/mod.rs b/datafusion/core/src/logical_plan/mod.rs
index f131d3389..d933b0229 100644
--- a/datafusion/core/src/logical_plan/mod.rs
+++ b/datafusion/core/src/logical_plan/mod.rs
@@ -40,14 +40,14 @@ pub use expr::{
     abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
     avg, bit_length, btrim, call_fn, case, ceil, character_length, chr, coalesce, col,
     columnize_expr, combine_filters, concat, concat_expr, concat_ws, concat_ws_expr, cos,
-    count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest, exp,
-    exprlist_to_fields, floor, in_list, initcap, left, length, lit, lit_timestamp_nano,
-    ln, log10, log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif,
-    octet_length, or, random, regexp_match, regexp_replace, repeat, replace, reverse,
-    right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part,
-    sqrt, starts_with, strpos, substr, sum, tan, to_hex, to_timestamp_micros,
-    to_timestamp_millis, to_timestamp_seconds, translate, trim, trunc, unalias, upper,
-    when, Column, Expr, ExprSchema, Literal,
+    count, count_distinct, create_udaf, create_udf, date_part, date_trunc, digest,
+    exists, exp, exprlist_to_fields, floor, in_list, initcap, left, length, lit,
+    lit_timestamp_nano, ln, log10, log2, lower, lpad, ltrim, max, md5, min, now,
+    now_expr, nullif, octet_length, or, random, regexp_match, regexp_replace, repeat,
+    replace, reverse, right, round, rpad, rtrim, sha224, sha256, sha384, sha512, signum,
+    sin, split_part, sqrt, starts_with, strpos, substr, sum, tan, to_hex,
+    to_timestamp_micros, to_timestamp_millis, to_timestamp_seconds, translate, trim,
+    trunc, unalias, upper, when, Column, Expr, ExprSchema, Literal,
 };
 pub use expr_rewriter::{
     normalize_col, normalize_cols, replace_col, rewrite_sort_cols_by_aggs,
@@ -60,6 +60,6 @@ pub use plan::{
     CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable,
     CrossJoin, DropTable, EmptyRelation, FileType, JoinConstraint, JoinType, Limit,
     LogicalPlan, Partitioning, PlanType, PlanVisitor, Repartition, StringifiedPlan,
-    TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
+    Subquery, TableScan, ToStringifiedPlan, Union, UserDefinedLogicalNode, Values,
 };
 pub use registry::FunctionRegistry;
diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs
index 89047af44..08d1fa120 100644
--- a/datafusion/core/src/logical_plan/plan.rs
+++ b/datafusion/core/src/logical_plan/plan.rs
@@ -28,7 +28,7 @@ pub use crate::logical_expr::{
         CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension,
         FileType, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
         Partitioning, PlanType, PlanVisitor, Projection, Repartition, Sort,
-        StringifiedPlan, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
+        StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, Union,
         UserDefinedLogicalNode, Values, Window,
     },
     TableProviderFilterPushDown, TableSource,
diff --git a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
index 39964df4a..4a9bf8e91 100644
--- a/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
+++ b/datafusion/core/src/optimizer/common_subexpr_eliminate.rs
@@ -216,6 +216,7 @@ fn optimize(plan: &LogicalPlan, execution_props: &ExecutionProps) -> Result<Logi
         | LogicalPlan::TableScan { .. }
         | LogicalPlan::Values(_)
         | LogicalPlan::EmptyRelation(_)
+        | LogicalPlan::Subquery(_)
         | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Limit(_)
         | LogicalPlan::CreateExternalTable(_)
@@ -459,6 +460,9 @@ impl ExprIdentifierVisitor<'_> {
                 desc.push_str("InList-");
                 desc.push_str(&negated.to_string());
             }
+            Expr::Exists(_) => {
+                desc.push_str("Exists-");
+            }
             Expr::Wildcard => {
                 desc.push_str("Wildcard-");
             }
diff --git a/datafusion/core/src/optimizer/projection_push_down.rs b/datafusion/core/src/optimizer/projection_push_down.rs
index 10bf5d10f..5062082e8 100644
--- a/datafusion/core/src/optimizer/projection_push_down.rs
+++ b/datafusion/core/src/optimizer/projection_push_down.rs
@@ -466,6 +466,7 @@ fn optimize_plan(
         | LogicalPlan::Filter { .. }
         | LogicalPlan::Repartition(_)
         | LogicalPlan::EmptyRelation(_)
+        | LogicalPlan::Subquery(_)
         | LogicalPlan::Values(_)
         | LogicalPlan::Sort { .. }
         | LogicalPlan::CreateExternalTable(_)
diff --git a/datafusion/core/src/optimizer/simplify_expressions.rs b/datafusion/core/src/optimizer/simplify_expressions.rs
index 394ab68bd..93d9fb506 100644
--- a/datafusion/core/src/optimizer/simplify_expressions.rs
+++ b/datafusion/core/src/optimizer/simplify_expressions.rs
@@ -375,6 +375,7 @@ impl<'a> ConstEvaluator<'a> {
             | Expr::AggregateUDF { .. }
             | Expr::ScalarVariable(_, _)
             | Expr::Column(_)
+            | Expr::Exists(_)
             | Expr::WindowFunction { .. }
             | Expr::Sort { .. }
             | Expr::Wildcard
diff --git a/datafusion/core/src/optimizer/utils.rs b/datafusion/core/src/optimizer/utils.rs
index 0dab2d3ed..939af8041 100644
--- a/datafusion/core/src/optimizer/utils.rs
+++ b/datafusion/core/src/optimizer/utils.rs
@@ -19,8 +19,9 @@
 
 use super::optimizer::OptimizerRule;
 use crate::execution::context::ExecutionProps;
-use crate::logical_plan::plan::{
-    Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, SubqueryAlias, Window,
+use datafusion_expr::logical_plan::{
+    Aggregate, Analyze, Extension, Filter, Join, Projection, Sort, Subquery,
+    SubqueryAlias, Window,
 };
 
 use crate::logical_plan::{
@@ -84,6 +85,7 @@ impl ExpressionVisitor for ColumnNameVisitor<'_> {
             | Expr::AggregateFunction { .. }
             | Expr::AggregateUDF { .. }
             | Expr::InList { .. }
+            | Expr::Exists(_)
             | Expr::Wildcard
             | Expr::QualifiedWildcard { .. }
             | Expr::GetIndexedField { .. } => {}
@@ -223,6 +225,12 @@ pub fn from_plan(
             let right = &inputs[1];
             LogicalPlanBuilder::from(left).cross_join(right)?.build()
         }
+        LogicalPlan::Subquery(_) => {
+            let subquery = LogicalPlanBuilder::from(inputs[0].clone()).build()?;
+            Ok(LogicalPlan::Subquery(Subquery {
+                subquery: Arc::new(subquery),
+            }))
+        }
         LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
             let schema = inputs[0].schema().as_ref().clone().into();
             let schema =
@@ -363,6 +371,7 @@ pub fn expr_sub_expressions(expr: &Expr) -> Result<Vec<Expr>> {
             }
             Ok(expr_list)
         }
+        Expr::Exists(_) => Ok(vec![]),
         Expr::Wildcard { .. } => Err(DataFusionError::Internal(
             "Wildcard expressions are not valid in a logical query plan".to_owned(),
         )),
@@ -497,6 +506,7 @@ pub fn rewrite_expression(expr: &Expr, expressions: &[Expr]) -> Result<Expr> {
         Expr::Column(_)
         | Expr::Literal(_)
         | Expr::InList { .. }
+        | Expr::Exists(_)
         | Expr::ScalarVariable(_, _) => Ok(expr.clone()),
         Expr::Sort {
             asc, nulls_first, ..
diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs
index 5b34a65dc..84785777b 100644
--- a/datafusion/core/src/physical_plan/planner.rs
+++ b/datafusion/core/src/physical_plan/planner.rs
@@ -186,6 +186,9 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result<String> {
                 Ok(format!("{} IN ({:?})", expr, list))
             }
         }
+        Expr::Exists(_) => Err(DataFusionError::NotImplemented(
+            "EXISTS is not yet supported in the physical plan".to_string(),
+        )),
         Expr::Between {
             expr,
             negated,
@@ -780,6 +783,7 @@ impl DefaultPhysicalPlanner {
                     let right = self.create_initial_plan(right, session_state).await?;
                     Ok(Arc::new(CrossJoinExec::try_new(left, right)?))
                 }
+                LogicalPlan::Subquery(_) => todo!(),
                 LogicalPlan::EmptyRelation(EmptyRelation {
                     produce_one_row,
                     schema,
diff --git a/datafusion/core/src/prelude.rs b/datafusion/core/src/prelude.rs
index 18343bce7..e0c418417 100644
--- a/datafusion/core/src/prelude.rs
+++ b/datafusion/core/src/prelude.rs
@@ -33,7 +33,7 @@ pub use crate::execution::options::{
 pub use crate::logical_plan::{
     approx_percentile_cont, array, ascii, avg, bit_length, btrim, character_length, chr,
     coalesce, col, concat, concat_ws, count, create_udf, date_part, date_trunc, digest,
-    in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
+    exists, in_list, initcap, left, length, lit, lower, lpad, ltrim, max, md5, min, now,
     octet_length, random, regexp_match, regexp_replace, repeat, replace, reverse, right,
     rpad, rtrim, sha224, sha256, sha384, sha512, split_part, starts_with, strpos, substr,
     sum, to_hex, translate, trim, upper, Column, JoinType, Partitioning,
diff --git a/datafusion/core/src/sql/utils.rs b/datafusion/core/src/sql/utils.rs
index 5e090c99b..f9242c6aa 100644
--- a/datafusion/core/src/sql/utils.rs
+++ b/datafusion/core/src/sql/utils.rs
@@ -368,9 +368,10 @@ where
                 asc: *asc,
                 nulls_first: *nulls_first,
             }),
-            Expr::Column { .. } | Expr::Literal(_) | Expr::ScalarVariable(_, _) => {
-                Ok(expr.clone())
-            }
+            Expr::Column { .. }
+            | Expr::Literal(_)
+            | Expr::ScalarVariable(_, _)
+            | Expr::Exists(_) => Ok(expr.clone()),
             Expr::Wildcard => Ok(Expr::Wildcard),
             Expr::QualifiedWildcard { .. } => Ok(expr.clone()),
             Expr::GetIndexedField { expr, key } => Ok(Expr::GetIndexedField {
diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs
index 4bad6e31f..88c489670 100644
--- a/datafusion/expr/src/expr.rs
+++ b/datafusion/expr/src/expr.rs
@@ -20,6 +20,7 @@
 use crate::aggregate_function;
 use crate::built_in_function;
 use crate::expr_fn::binary_expr;
+use crate::logical_plan::Subquery;
 use crate::window_frame;
 use crate::window_function;
 use crate::AggregateUDF;
@@ -226,6 +227,8 @@ pub enum Expr {
         /// Whether the expression is negated
         negated: bool,
     },
+    /// EXISTS subquery
+    Exists(Subquery),
     /// Represents a reference to all fields in a schema.
     Wildcard,
     /// Represents a reference to all fields in a specific schema.
@@ -431,6 +434,7 @@ impl fmt::Debug for Expr {
             Expr::Negative(expr) => write!(f, "(- {:?})", expr),
             Expr::IsNull(expr) => write!(f, "{:?} IS NULL", expr),
             Expr::IsNotNull(expr) => write!(f, "{:?} IS NOT NULL", expr),
+            Expr::Exists(subquery) => write!(f, "EXISTS ({:?})", subquery),
             Expr::BinaryExpr { left, op, right } => {
                 write!(f, "{:?} {} {:?}", left, op, right)
             }
@@ -618,6 +622,7 @@ fn create_name(e: &Expr, input_schema: &DFSchema) -> Result<String> {
             let expr = create_name(expr, input_schema)?;
             Ok(format!("{} IS NOT NULL", expr))
         }
+        Expr::Exists(_) => Ok("EXISTS".to_string()),
         Expr::GetIndexedField { expr, key } => {
             let expr = create_name(expr, input_schema)?;
             Ok(format!("{}[{}]", expr, key))
diff --git a/datafusion/expr/src/expr_fn.rs b/datafusion/expr/src/expr_fn.rs
index a723f5306..19c311f4f 100644
--- a/datafusion/expr/src/expr_fn.rs
+++ b/datafusion/expr/src/expr_fn.rs
@@ -18,7 +18,9 @@
 //! Functions for creating logical expressions
 
 use crate::conditional_expressions::CaseBuilder;
-use crate::{aggregate_function, built_in_function, lit, Expr, Operator};
+use crate::logical_plan::Subquery;
+use crate::{aggregate_function, built_in_function, lit, Expr, LogicalPlan, Operator};
+use std::sync::Arc;
 
 /// Create a column expression based on a qualified or unqualified column name
 pub fn col(ident: &str) -> Expr {
@@ -180,6 +182,11 @@ pub fn approx_percentile_cont_with_weight(
     }
 }
 
+/// Create an EXISTS subquery expression
+pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
+    Expr::Exists(Subquery { subquery })
+}
+
 // TODO(kszucs): this seems buggy, unary_scalar_expr! is used for many
 // varying arity functions
 /// Create an convenience function representing a unary scalar function
diff --git a/datafusion/expr/src/expr_schema.rs b/datafusion/expr/src/expr_schema.rs
index a216281b3..4c6457962 100644
--- a/datafusion/expr/src/expr_schema.rs
+++ b/datafusion/expr/src/expr_schema.rs
@@ -100,6 +100,7 @@ impl ExprSchemable for Expr {
             }
             Expr::Not(_)
             | Expr::IsNull(_)
+            | Expr::Exists(_)
             | Expr::Between { .. }
             | Expr::InList { .. }
             | Expr::IsNotNull(_) => Ok(DataType::Boolean),
@@ -172,7 +173,7 @@ impl ExprSchemable for Expr {
             | Expr::WindowFunction { .. }
             | Expr::AggregateFunction { .. }
             | Expr::AggregateUDF { .. } => Ok(true),
-            Expr::IsNull(_) | Expr::IsNotNull(_) => Ok(false),
+            Expr::IsNull(_) | Expr::IsNotNull(_) | Expr::Exists(_) => Ok(false),
             Expr::BinaryExpr {
                 ref left,
                 ref right,
diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs
index b1e822077..3dd24600a 100644
--- a/datafusion/expr/src/lib.rs
+++ b/datafusion/expr/src/lib.rs
@@ -48,11 +48,11 @@ pub use expr_fn::{
     abs, acos, and, approx_distinct, approx_percentile_cont, array, ascii, asin, atan,
     avg, bit_length, btrim, case, ceil, character_length, chr, coalesce, col, concat,
     concat_expr, concat_ws, concat_ws_expr, cos, count, count_distinct, date_part,
-    date_trunc, digest, exp, floor, in_list, initcap, left, length, ln, log10, log2,
-    lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or, random,
-    regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad, rtrim,
-    sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with, strpos,
-    substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
+    date_trunc, digest, exists, exp, floor, in_list, initcap, left, length, ln, log10,
+    log2, lower, lpad, ltrim, max, md5, min, now, now_expr, nullif, octet_length, or,
+    random, regexp_match, regexp_replace, repeat, replace, reverse, right, round, rpad,
+    rtrim, sha224, sha256, sha384, sha512, signum, sin, split_part, sqrt, starts_with,
+    strpos, substr, sum, tan, to_hex, to_timestamp_micros, to_timestamp_millis,
     to_timestamp_seconds, translate, trim, trunc, upper, when,
 };
 pub use expr_schema::ExprSchemable;
diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs
index 525740274..a37729f7d 100644
--- a/datafusion/expr/src/logical_plan/mod.rs
+++ b/datafusion/expr/src/logical_plan/mod.rs
@@ -23,7 +23,7 @@ pub use plan::{
     Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
     CreateMemoryTable, CrossJoin, DropTable, EmptyRelation, Explain, Extension, FileType,
     Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
-    PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, SubqueryAlias,
+    PlanVisitor, Projection, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
     TableScan, ToStringifiedPlan, Union, Values, Window,
 };
 
diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs
index 8fefac67d..579898dbe 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -22,7 +22,8 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion_common::{Column, DFSchemaRef, DataFusionError};
 use std::collections::HashSet;
 ///! Logical plan types
-use std::fmt::{self, Display, Formatter};
+use std::fmt::{self, Debug, Display, Formatter};
+use std::hash::{Hash, Hasher};
 use std::sync::Arc;
 
 /// A LogicalPlan represents the different types of relational
@@ -66,6 +67,8 @@ pub enum LogicalPlan {
     TableScan(TableScan),
     /// Produces no rows: An empty relation with an empty schema
     EmptyRelation(EmptyRelation),
+    /// Subquery
+    Subquery(Subquery),
     /// Aliased relation provides, or changes, the name of a relation.
     SubqueryAlias(SubqueryAlias),
     /// Produces the first `n` tuples from its input and discards the rest.
@@ -112,6 +115,7 @@ impl LogicalPlan {
             LogicalPlan::CrossJoin(CrossJoin { schema, .. }) => schema,
             LogicalPlan::Repartition(Repartition { input, .. }) => input.schema(),
             LogicalPlan::Limit(Limit { input, .. }) => input.schema(),
+            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.schema(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => schema,
             LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) => {
                 schema
@@ -161,6 +165,7 @@ impl LogicalPlan {
                 schemas.insert(0, schema);
                 schemas
             }
+            LogicalPlan::Subquery(Subquery { subquery, .. }) => subquery.all_schemas(),
             LogicalPlan::SubqueryAlias(SubqueryAlias { schema, .. }) => {
                 vec![schema]
             }
@@ -225,6 +230,7 @@ impl LogicalPlan {
             // plans without expressions
             LogicalPlan::TableScan { .. }
             | LogicalPlan::EmptyRelation(_)
+            | LogicalPlan::Subquery(_)
             | LogicalPlan::SubqueryAlias(_)
             | LogicalPlan::Limit(_)
             | LogicalPlan::CreateExternalTable(_)
@@ -254,6 +260,7 @@ impl LogicalPlan {
             LogicalPlan::Join(Join { left, right, .. }) => vec![left, right],
             LogicalPlan::CrossJoin(CrossJoin { left, right, .. }) => vec![left, right],
             LogicalPlan::Limit(Limit { input, .. }) => vec![input],
+            LogicalPlan::Subquery(Subquery { subquery, .. }) => vec![subquery],
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => vec![input],
             LogicalPlan::Extension(extension) => extension.node.inputs(),
             LogicalPlan::Union(Union { inputs, .. }) => inputs.iter().collect(),
@@ -392,6 +399,9 @@ impl LogicalPlan {
                 true
             }
             LogicalPlan::Limit(Limit { input, .. }) => input.accept(visitor)?,
+            LogicalPlan::Subquery(Subquery { subquery, .. }) => {
+                subquery.accept(visitor)?
+            }
             LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => {
                 input.accept(visitor)?
             }
@@ -766,6 +776,9 @@ impl LogicalPlan {
                         }
                     },
                     LogicalPlan::Limit(Limit { ref n, .. }) => write!(f, "Limit: {}", n),
+                    LogicalPlan::Subquery(Subquery { subquery, .. }) => {
+                        write!(f, "Subquery: {:?}", subquery)
+                    }
                     LogicalPlan::SubqueryAlias(SubqueryAlias { ref alias, .. }) => {
                         write!(f, "SubqueryAlias: {}", alias)
                     }
@@ -1141,6 +1154,38 @@ pub struct Join {
     pub null_equals_null: bool,
 }
 
+/// Subquery
+#[derive(Clone)]
+pub struct Subquery {
+    /// The subquery
+    pub subquery: Arc<LogicalPlan>,
+}
+
+impl Debug for Subquery {
+    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
+        write!(f, "Subquery: {:?}", self.subquery)
+    }
+}
+
+impl Hash for Subquery {
+    fn hash<H: Hasher>(&self, state: &mut H) {
+        state.finish();
+    }
+
+    fn hash_slice<H: Hasher>(_data: &[Self], state: &mut H)
+    where
+        Self: Sized,
+    {
+        state.finish();
+    }
+}
+
+impl PartialEq for Subquery {
+    fn eq(&self, _other: &Self) -> bool {
+        false
+    }
+}
+
 /// Logical partitioning schemes supported by the repartition operator.
 #[derive(Debug, Clone)]
 pub enum Partitioning {