You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2021/07/07 17:36:18 UTC

[spark] branch branch-3.1 updated: [SPARK-36020][SQL][3.1] Check logical link in remove redundant projects

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 7297bdb  [SPARK-36020][SQL][3.1] Check logical link in remove redundant projects
7297bdb is described below

commit 7297bdb4c9b244ff18bbe45781daf2fb7288aae6
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Thu Jul 8 01:35:39 2021 +0800

    [SPARK-36020][SQL][3.1] Check logical link in remove redundant projects
    
    backport [#33222](https://github.com/apache/spark/pull/33222)
    
    ### What changes were proposed in this pull request?
    
    The RemoveRedundantProjects feature can conflict with the AQE broadcast threshold (PR) sometimes. After removing the project, the physical plan to logical plan link can be changed and we may have a Project above LogicalQueryStage. This breaks AQE broadcast threshold, because the stats of Project does not have the isRuntime = true flag, and thus still use the normal broadcast threshold.
    
    This PR updates RemoveRedundantProjects to not remove ProjectExec that has a different logical plan link than its child.
    
    ### Why are the changes needed?
    
    Make AQE broadcast threshold work in more cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    Closes #33238 from ulysses-you/SPARK-36020-3-1.
    
    Lead-authored-by: ulysses-you <ul...@gmail.com>
    Co-authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/execution/RemoveRedundantProjects.scala    |  13 +-
 .../approved-plans-v1_4/q38.sf100/explain.txt      | 181 +++++++++++----------
 .../approved-plans-v1_4/q38.sf100/simplified.txt   | 167 +++++++++----------
 .../approved-plans-v1_4/q38/explain.txt            | 141 ++++++++--------
 .../approved-plans-v1_4/q38/simplified.txt         | 103 ++++++------
 .../approved-plans-v1_4/q87.sf100/explain.txt      | 181 +++++++++++----------
 .../approved-plans-v1_4/q87.sf100/simplified.txt   | 167 +++++++++----------
 .../approved-plans-v1_4/q87/explain.txt            | 141 ++++++++--------
 .../approved-plans-v1_4/q87/simplified.txt         | 103 ++++++------
 .../approved-plans-v2_7/q22.sf100/explain.txt      |  77 +++++----
 .../approved-plans-v2_7/q22.sf100/simplified.txt   |  81 ++++-----
 .../approved-plans-v2_7/q22/explain.txt            |  67 ++++----
 .../approved-plans-v2_7/q22/simplified.txt         |  59 +++----
 .../execution/LogicalPlanTagInSparkPlanSuite.scala |   2 +-
 .../execution/RemoveRedundantProjectsSuite.scala   |  17 +-
 .../adaptive/AdaptiveQueryExecSuite.scala          |  21 +++
 16 files changed, 799 insertions(+), 722 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
index 1520b48..eeeb868 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/RemoveRedundantProjects.scala
@@ -48,10 +48,8 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
   private def removeProject(plan: SparkPlan, requireOrdering: Boolean): SparkPlan = {
     plan match {
       case p @ ProjectExec(_, child) =>
-        if (isRedundant(p, child, requireOrdering)) {
-          val newPlan = removeProject(child, requireOrdering)
-          newPlan.setLogicalLink(child.logicalLink.get)
-          newPlan
+        if (isRedundant(p, child, requireOrdering) && canRemove(p, child)) {
+          removeProject(child, requireOrdering)
         } else {
           p.mapChildren(removeProject(_, false))
         }
@@ -110,4 +108,11 @@ object RemoveRedundantProjects extends Rule[SparkPlan] {
         }
     }
   }
+
+  // SPARK-36020: Currently a project can only be removed if (1) its logical link is empty or (2)
+  // its logical link is the same as the child's logical link. This is to ensure the physical
+  // plan node can correctly map to its logical plan node in AQE.
+  private def canRemove(project: ProjectExec, child: SparkPlan): Boolean = {
+    project.logicalLink.isEmpty || project.logicalLink.exists(child.logicalLink.contains)
+  }
 }
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/explain.txt
index 7465ddae..a925e29 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/explain.txt
@@ -1,71 +1,72 @@
 == Physical Plan ==
-* HashAggregate (67)
-+- Exchange (66)
-   +- * HashAggregate (65)
-      +- * HashAggregate (64)
-         +- * HashAggregate (63)
-            +- * HashAggregate (62)
-               +- * HashAggregate (61)
-                  +- * HashAggregate (60)
-                     +- Exchange (59)
-                        +- * HashAggregate (58)
-                           +- SortMergeJoin LeftSemi (57)
-                              :- SortMergeJoin LeftSemi (39)
-                              :  :- * Sort (21)
-                              :  :  +- Exchange (20)
-                              :  :     +- * Project (19)
-                              :  :        +- * SortMergeJoin Inner (18)
-                              :  :           :- * Sort (12)
-                              :  :           :  +- Exchange (11)
-                              :  :           :     +- * Project (10)
-                              :  :           :        +- * BroadcastHashJoin Inner BuildRight (9)
-                              :  :           :           :- * Filter (3)
-                              :  :           :           :  +- * ColumnarToRow (2)
-                              :  :           :           :     +- Scan parquet default.store_sales (1)
-                              :  :           :           +- BroadcastExchange (8)
-                              :  :           :              +- * Project (7)
-                              :  :           :                 +- * Filter (6)
-                              :  :           :                    +- * ColumnarToRow (5)
-                              :  :           :                       +- Scan parquet default.date_dim (4)
-                              :  :           +- * Sort (17)
-                              :  :              +- Exchange (16)
-                              :  :                 +- * Filter (15)
-                              :  :                    +- * ColumnarToRow (14)
-                              :  :                       +- Scan parquet default.customer (13)
-                              :  +- * Sort (38)
-                              :     +- Exchange (37)
-                              :        +- * HashAggregate (36)
-                              :           +- Exchange (35)
-                              :              +- * HashAggregate (34)
-                              :                 +- * Project (33)
-                              :                    +- * SortMergeJoin Inner (32)
-                              :                       :- * Sort (29)
-                              :                       :  +- Exchange (28)
-                              :                       :     +- * Project (27)
-                              :                       :        +- * BroadcastHashJoin Inner BuildRight (26)
-                              :                       :           :- * Filter (24)
-                              :                       :           :  +- * ColumnarToRow (23)
-                              :                       :           :     +- Scan parquet default.catalog_sales (22)
-                              :                       :           +- ReusedExchange (25)
-                              :                       +- * Sort (31)
-                              :                          +- ReusedExchange (30)
-                              +- * Sort (56)
-                                 +- Exchange (55)
-                                    +- * HashAggregate (54)
-                                       +- Exchange (53)
-                                          +- * HashAggregate (52)
-                                             +- * Project (51)
-                                                +- * SortMergeJoin Inner (50)
-                                                   :- * Sort (47)
-                                                   :  +- Exchange (46)
-                                                   :     +- * Project (45)
-                                                   :        +- * BroadcastHashJoin Inner BuildRight (44)
-                                                   :           :- * Filter (42)
-                                                   :           :  +- * ColumnarToRow (41)
-                                                   :           :     +- Scan parquet default.web_sales (40)
-                                                   :           +- ReusedExchange (43)
-                                                   +- * Sort (49)
-                                                      +- ReusedExchange (48)
+* HashAggregate (68)
++- Exchange (67)
+   +- * HashAggregate (66)
+      +- * HashAggregate (65)
+         +- * HashAggregate (64)
+            +- * HashAggregate (63)
+               +- * HashAggregate (62)
+                  +- * HashAggregate (61)
+                     +- Exchange (60)
+                        +- * HashAggregate (59)
+                           +- * Project (58)
+                              +- SortMergeJoin LeftSemi (57)
+                                 :- SortMergeJoin LeftSemi (39)
+                                 :  :- * Sort (21)
+                                 :  :  +- Exchange (20)
+                                 :  :     +- * Project (19)
+                                 :  :        +- * SortMergeJoin Inner (18)
+                                 :  :           :- * Sort (12)
+                                 :  :           :  +- Exchange (11)
+                                 :  :           :     +- * Project (10)
+                                 :  :           :        +- * BroadcastHashJoin Inner BuildRight (9)
+                                 :  :           :           :- * Filter (3)
+                                 :  :           :           :  +- * ColumnarToRow (2)
+                                 :  :           :           :     +- Scan parquet default.store_sales (1)
+                                 :  :           :           +- BroadcastExchange (8)
+                                 :  :           :              +- * Project (7)
+                                 :  :           :                 +- * Filter (6)
+                                 :  :           :                    +- * ColumnarToRow (5)
+                                 :  :           :                       +- Scan parquet default.date_dim (4)
+                                 :  :           +- * Sort (17)
+                                 :  :              +- Exchange (16)
+                                 :  :                 +- * Filter (15)
+                                 :  :                    +- * ColumnarToRow (14)
+                                 :  :                       +- Scan parquet default.customer (13)
+                                 :  +- * Sort (38)
+                                 :     +- Exchange (37)
+                                 :        +- * HashAggregate (36)
+                                 :           +- Exchange (35)
+                                 :              +- * HashAggregate (34)
+                                 :                 +- * Project (33)
+                                 :                    +- * SortMergeJoin Inner (32)
+                                 :                       :- * Sort (29)
+                                 :                       :  +- Exchange (28)
+                                 :                       :     +- * Project (27)
+                                 :                       :        +- * BroadcastHashJoin Inner BuildRight (26)
+                                 :                       :           :- * Filter (24)
+                                 :                       :           :  +- * ColumnarToRow (23)
+                                 :                       :           :     +- Scan parquet default.catalog_sales (22)
+                                 :                       :           +- ReusedExchange (25)
+                                 :                       +- * Sort (31)
+                                 :                          +- ReusedExchange (30)
+                                 +- * Sort (56)
+                                    +- Exchange (55)
+                                       +- * HashAggregate (54)
+                                          +- Exchange (53)
+                                             +- * HashAggregate (52)
+                                                +- * Project (51)
+                                                   +- * SortMergeJoin Inner (50)
+                                                      :- * Sort (47)
+                                                      :  +- Exchange (46)
+                                                      :     +- * Project (45)
+                                                      :        +- * BroadcastHashJoin Inner BuildRight (44)
+                                                      :           :- * Filter (42)
+                                                      :           :  +- * ColumnarToRow (41)
+                                                      :           :     +- Scan parquet default.web_sales (40)
+                                                      :           +- ReusedExchange (43)
+                                                      +- * Sort (49)
+                                                         +- ReusedExchange (48)
 
 
 (1) Scan parquet default.store_sales
@@ -115,7 +116,7 @@ Input [4]: [ss_sold_date_sk#1, ss_customer_sk#2, d_date_sk#3, d_date#4]
 
 (11) Exchange
 Input [2]: [ss_customer_sk#2, d_date#4]
-Arguments: hashpartitioning(ss_customer_sk#2, 5), true, [id=#7]
+Arguments: hashpartitioning(ss_customer_sk#2, 5), ENSURE_REQUIREMENTS, [id=#7]
 
 (12) Sort [codegen id : 3]
 Input [2]: [ss_customer_sk#2, d_date#4]
@@ -137,7 +138,7 @@ Condition : isnotnull(c_customer_sk#8)
 
 (16) Exchange
 Input [3]: [c_customer_sk#8, c_first_name#9, c_last_name#10]
-Arguments: hashpartitioning(c_customer_sk#8, 5), true, [id=#11]
+Arguments: hashpartitioning(c_customer_sk#8, 5), ENSURE_REQUIREMENTS, [id=#11]
 
 (17) Sort [codegen id : 5]
 Input [3]: [c_customer_sk#8, c_first_name#9, c_last_name#10]
@@ -154,7 +155,7 @@ Input [5]: [ss_customer_sk#2, d_date#4, c_customer_sk#8, c_first_name#9, c_last_
 
 (20) Exchange
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
-Arguments: hashpartitioning(coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_first_name#9, ), isnull(c_first_name#9), coalesce(d_date#4, 0), isnull(d_date#4), 5), true, [id=#12]
+Arguments: hashpartitioning(coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_first_name#9, ), isnull(c_first_name#9), coalesce(d_date#4, 0), isnull(d_date#4), 5), ENSURE_REQUIREMENTS, [id=#12]
 
 (21) Sort [codegen id : 7]
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
@@ -188,7 +189,7 @@ Input [4]: [cs_sold_date_sk#13, cs_bill_customer_sk#14, d_date_sk#15, d_date#16]
 
 (28) Exchange
 Input [2]: [cs_bill_customer_sk#14, d_date#16]
-Arguments: hashpartitioning(cs_bill_customer_sk#14, 5), true, [id=#17]
+Arguments: hashpartitioning(cs_bill_customer_sk#14, 5), ENSURE_REQUIREMENTS, [id=#17]
 
 (29) Sort [codegen id : 10]
 Input [2]: [cs_bill_customer_sk#14, d_date#16]
@@ -219,7 +220,7 @@ Results [3]: [c_last_name#20, c_first_name#19, d_date#16]
 
 (35) Exchange
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
-Arguments: hashpartitioning(c_last_name#20, c_first_name#19, d_date#16, 5), true, [id=#21]
+Arguments: hashpartitioning(c_last_name#20, c_first_name#19, d_date#16, 5), ENSURE_REQUIREMENTS, [id=#21]
 
 (36) HashAggregate [codegen id : 14]
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
@@ -230,7 +231,7 @@ Results [3]: [c_last_name#20, c_first_name#19, d_date#16]
 
 (37) Exchange
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
-Arguments: hashpartitioning(coalesce(c_last_name#20, ), isnull(c_last_name#20), coalesce(c_first_name#19, ), isnull(c_first_name#19), coalesce(d_date#16, 0), isnull(d_date#16), 5), true, [id=#22]
+Arguments: hashpartitioning(coalesce(c_last_name#20, ), isnull(c_last_name#20), coalesce(c_first_name#19, ), isnull(c_first_name#19), coalesce(d_date#16, 0), isnull(d_date#16), 5), ENSURE_REQUIREMENTS, [id=#22]
 
 (38) Sort [codegen id : 15]
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
@@ -269,7 +270,7 @@ Input [4]: [ws_sold_date_sk#23, ws_bill_customer_sk#24, d_date_sk#25, d_date#26]
 
 (46) Exchange
 Input [2]: [ws_bill_customer_sk#24, d_date#26]
-Arguments: hashpartitioning(ws_bill_customer_sk#24, 5), true, [id=#27]
+Arguments: hashpartitioning(ws_bill_customer_sk#24, 5), ENSURE_REQUIREMENTS, [id=#27]
 
 (47) Sort [codegen id : 18]
 Input [2]: [ws_bill_customer_sk#24, d_date#26]
@@ -300,7 +301,7 @@ Results [3]: [c_last_name#30, c_first_name#29, d_date#26]
 
 (53) Exchange
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
-Arguments: hashpartitioning(c_last_name#30, c_first_name#29, d_date#26, 5), true, [id=#31]
+Arguments: hashpartitioning(c_last_name#30, c_first_name#29, d_date#26, 5), ENSURE_REQUIREMENTS, [id=#31]
 
 (54) HashAggregate [codegen id : 22]
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
@@ -311,7 +312,7 @@ Results [3]: [c_last_name#30, c_first_name#29, d_date#26]
 
 (55) Exchange
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
-Arguments: hashpartitioning(coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26), 5), true, [id=#32]
+Arguments: hashpartitioning(coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26), 5), ENSURE_REQUIREMENTS, [id=#32]
 
 (56) Sort [codegen id : 23]
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
@@ -322,64 +323,68 @@ Left keys [6]: [coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_f
 Right keys [6]: [coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26)]
 Join condition: None
 
-(58) HashAggregate [codegen id : 24]
+(58) Project [codegen id : 24]
+Output [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
+
+(59) HashAggregate [codegen id : 24]
+Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(59) Exchange
+(60) Exchange
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
-Arguments: hashpartitioning(c_last_name#10, c_first_name#9, d_date#4, 5), true, [id=#33]
+Arguments: hashpartitioning(c_last_name#10, c_first_name#9, d_date#4, 5), ENSURE_REQUIREMENTS, [id=#33]
 
-(60) HashAggregate [codegen id : 25]
+(61) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(61) HashAggregate [codegen id : 25]
+(62) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(62) HashAggregate [codegen id : 25]
+(63) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(63) HashAggregate [codegen id : 25]
+(64) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(64) HashAggregate [codegen id : 25]
+(65) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results: []
 
-(65) HashAggregate [codegen id : 25]
+(66) HashAggregate [codegen id : 25]
 Input: []
 Keys: []
 Functions [1]: [partial_count(1)]
 Aggregate Attributes [1]: [count#34]
 Results [1]: [count#35]
 
-(66) Exchange
+(67) Exchange
 Input [1]: [count#35]
-Arguments: SinglePartition, true, [id=#36]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#36]
 
-(67) HashAggregate [codegen id : 26]
+(68) HashAggregate [codegen id : 26]
 Input [1]: [count#35]
 Keys: []
 Functions [1]: [count(1)]
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/simplified.txt
index 8dd5934..015d3c5 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38.sf100/simplified.txt
@@ -13,105 +13,106 @@ WholeStageCodegen (26)
                         Exchange [c_last_name,c_first_name,d_date] #2
                           WholeStageCodegen (24)
                             HashAggregate [c_last_name,c_first_name,d_date]
-                              InputAdapter
-                                SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                              Project [c_last_name,c_first_name,d_date]
+                                InputAdapter
                                   SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
-                                    WholeStageCodegen (7)
-                                      Sort [c_last_name,c_first_name,d_date]
-                                        InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #3
-                                            WholeStageCodegen (6)
-                                              Project [d_date,c_first_name,c_last_name]
-                                                SortMergeJoin [ss_customer_sk,c_customer_sk]
+                                    SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                                      WholeStageCodegen (7)
+                                        Sort [c_last_name,c_first_name,d_date]
+                                          InputAdapter
+                                            Exchange [c_last_name,c_first_name,d_date] #3
+                                              WholeStageCodegen (6)
+                                                Project [d_date,c_first_name,c_last_name]
+                                                  SortMergeJoin [ss_customer_sk,c_customer_sk]
+                                                    InputAdapter
+                                                      WholeStageCodegen (3)
+                                                        Sort [ss_customer_sk]
+                                                          InputAdapter
+                                                            Exchange [ss_customer_sk] #4
+                                                              WholeStageCodegen (2)
+                                                                Project [ss_customer_sk,d_date]
+                                                                  BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
+                                                                    Filter [ss_sold_date_sk,ss_customer_sk]
+                                                                      ColumnarToRow
+                                                                        InputAdapter
+                                                                          Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
+                                                                    InputAdapter
+                                                                      BroadcastExchange #5
+                                                                        WholeStageCodegen (1)
+                                                                          Project [d_date_sk,d_date]
+                                                                            Filter [d_month_seq,d_date_sk]
+                                                                              ColumnarToRow
+                                                                                InputAdapter
+                                                                                  Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
+                                                    InputAdapter
+                                                      WholeStageCodegen (5)
+                                                        Sort [c_customer_sk]
+                                                          InputAdapter
+                                                            Exchange [c_customer_sk] #6
+                                                              WholeStageCodegen (4)
+                                                                Filter [c_customer_sk]
+                                                                  ColumnarToRow
+                                                                    InputAdapter
+                                                                      Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                      WholeStageCodegen (15)
+                                        Sort [c_last_name,c_first_name,d_date]
+                                          InputAdapter
+                                            Exchange [c_last_name,c_first_name,d_date] #7
+                                              WholeStageCodegen (14)
+                                                HashAggregate [c_last_name,c_first_name,d_date]
                                                   InputAdapter
-                                                    WholeStageCodegen (3)
-                                                      Sort [ss_customer_sk]
-                                                        InputAdapter
-                                                          Exchange [ss_customer_sk] #4
-                                                            WholeStageCodegen (2)
-                                                              Project [ss_customer_sk,d_date]
-                                                                BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
-                                                                  Filter [ss_sold_date_sk,ss_customer_sk]
-                                                                    ColumnarToRow
-                                                                      InputAdapter
-                                                                        Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
-                                                                  InputAdapter
-                                                                    BroadcastExchange #5
-                                                                      WholeStageCodegen (1)
-                                                                        Project [d_date_sk,d_date]
-                                                                          Filter [d_month_seq,d_date_sk]
-                                                                            ColumnarToRow
+                                                    Exchange [c_last_name,c_first_name,d_date] #8
+                                                      WholeStageCodegen (13)
+                                                        HashAggregate [c_last_name,c_first_name,d_date]
+                                                          Project [c_last_name,c_first_name,d_date]
+                                                            SortMergeJoin [cs_bill_customer_sk,c_customer_sk]
+                                                              InputAdapter
+                                                                WholeStageCodegen (10)
+                                                                  Sort [cs_bill_customer_sk]
+                                                                    InputAdapter
+                                                                      Exchange [cs_bill_customer_sk] #9
+                                                                        WholeStageCodegen (9)
+                                                                          Project [cs_bill_customer_sk,d_date]
+                                                                            BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
+                                                                              Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                                ColumnarToRow
+                                                                                  InputAdapter
+                                                                                    Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
                                                                               InputAdapter
-                                                                                Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
-                                                  InputAdapter
-                                                    WholeStageCodegen (5)
-                                                      Sort [c_customer_sk]
-                                                        InputAdapter
-                                                          Exchange [c_customer_sk] #6
-                                                            WholeStageCodegen (4)
-                                                              Filter [c_customer_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
-                                    WholeStageCodegen (15)
+                                                                                ReusedExchange [d_date_sk,d_date] #5
+                                                              InputAdapter
+                                                                WholeStageCodegen (12)
+                                                                  Sort [c_customer_sk]
+                                                                    InputAdapter
+                                                                      ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
+                                    WholeStageCodegen (23)
                                       Sort [c_last_name,c_first_name,d_date]
                                         InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #7
-                                            WholeStageCodegen (14)
+                                          Exchange [c_last_name,c_first_name,d_date] #10
+                                            WholeStageCodegen (22)
                                               HashAggregate [c_last_name,c_first_name,d_date]
                                                 InputAdapter
-                                                  Exchange [c_last_name,c_first_name,d_date] #8
-                                                    WholeStageCodegen (13)
+                                                  Exchange [c_last_name,c_first_name,d_date] #11
+                                                    WholeStageCodegen (21)
                                                       HashAggregate [c_last_name,c_first_name,d_date]
                                                         Project [c_last_name,c_first_name,d_date]
-                                                          SortMergeJoin [cs_bill_customer_sk,c_customer_sk]
+                                                          SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
                                                             InputAdapter
-                                                              WholeStageCodegen (10)
-                                                                Sort [cs_bill_customer_sk]
+                                                              WholeStageCodegen (18)
+                                                                Sort [ws_bill_customer_sk]
                                                                   InputAdapter
-                                                                    Exchange [cs_bill_customer_sk] #9
-                                                                      WholeStageCodegen (9)
-                                                                        Project [cs_bill_customer_sk,d_date]
-                                                                          BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
-                                                                            Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                    Exchange [ws_bill_customer_sk] #12
+                                                                      WholeStageCodegen (17)
+                                                                        Project [ws_bill_customer_sk,d_date]
+                                                                          BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
+                                                                            Filter [ws_sold_date_sk,ws_bill_customer_sk]
                                                                               ColumnarToRow
                                                                                 InputAdapter
-                                                                                  Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                                  Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
                                                                             InputAdapter
                                                                               ReusedExchange [d_date_sk,d_date] #5
                                                             InputAdapter
-                                                              WholeStageCodegen (12)
+                                                              WholeStageCodegen (20)
                                                                 Sort [c_customer_sk]
                                                                   InputAdapter
                                                                     ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
-                                  WholeStageCodegen (23)
-                                    Sort [c_last_name,c_first_name,d_date]
-                                      InputAdapter
-                                        Exchange [c_last_name,c_first_name,d_date] #10
-                                          WholeStageCodegen (22)
-                                            HashAggregate [c_last_name,c_first_name,d_date]
-                                              InputAdapter
-                                                Exchange [c_last_name,c_first_name,d_date] #11
-                                                  WholeStageCodegen (21)
-                                                    HashAggregate [c_last_name,c_first_name,d_date]
-                                                      Project [c_last_name,c_first_name,d_date]
-                                                        SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
-                                                          InputAdapter
-                                                            WholeStageCodegen (18)
-                                                              Sort [ws_bill_customer_sk]
-                                                                InputAdapter
-                                                                  Exchange [ws_bill_customer_sk] #12
-                                                                    WholeStageCodegen (17)
-                                                                      Project [ws_bill_customer_sk,d_date]
-                                                                        BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
-                                                                          Filter [ws_sold_date_sk,ws_bill_customer_sk]
-                                                                            ColumnarToRow
-                                                                              InputAdapter
-                                                                                Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
-                                                                          InputAdapter
-                                                                            ReusedExchange [d_date_sk,d_date] #5
-                                                          InputAdapter
-                                                            WholeStageCodegen (20)
-                                                              Sort [c_customer_sk]
-                                                                InputAdapter
-                                                                  ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt
index 74454cf..05b4c6b 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/explain.txt
@@ -1,58 +1,59 @@
 == Physical Plan ==
-* HashAggregate (54)
-+- Exchange (53)
-   +- * HashAggregate (52)
-      +- * HashAggregate (51)
-         +- * HashAggregate (50)
-            +- * HashAggregate (49)
-               +- * HashAggregate (48)
-                  +- * HashAggregate (47)
-                     +- Exchange (46)
-                        +- * HashAggregate (45)
-                           +- * BroadcastHashJoin LeftSemi BuildRight (44)
-                              :- * BroadcastHashJoin LeftSemi BuildRight (30)
-                              :  :- * Project (16)
-                              :  :  +- * BroadcastHashJoin Inner BuildRight (15)
-                              :  :     :- * Project (10)
-                              :  :     :  +- * BroadcastHashJoin Inner BuildRight (9)
-                              :  :     :     :- * Filter (3)
-                              :  :     :     :  +- * ColumnarToRow (2)
-                              :  :     :     :     +- Scan parquet default.store_sales (1)
-                              :  :     :     +- BroadcastExchange (8)
-                              :  :     :        +- * Project (7)
-                              :  :     :           +- * Filter (6)
-                              :  :     :              +- * ColumnarToRow (5)
-                              :  :     :                 +- Scan parquet default.date_dim (4)
-                              :  :     +- BroadcastExchange (14)
-                              :  :        +- * Filter (13)
-                              :  :           +- * ColumnarToRow (12)
-                              :  :              +- Scan parquet default.customer (11)
-                              :  +- BroadcastExchange (29)
-                              :     +- * HashAggregate (28)
-                              :        +- Exchange (27)
-                              :           +- * HashAggregate (26)
-                              :              +- * Project (25)
-                              :                 +- * BroadcastHashJoin Inner BuildRight (24)
-                              :                    :- * Project (22)
-                              :                    :  +- * BroadcastHashJoin Inner BuildRight (21)
-                              :                    :     :- * Filter (19)
-                              :                    :     :  +- * ColumnarToRow (18)
-                              :                    :     :     +- Scan parquet default.catalog_sales (17)
-                              :                    :     +- ReusedExchange (20)
-                              :                    +- ReusedExchange (23)
-                              +- BroadcastExchange (43)
-                                 +- * HashAggregate (42)
-                                    +- Exchange (41)
-                                       +- * HashAggregate (40)
-                                          +- * Project (39)
-                                             +- * BroadcastHashJoin Inner BuildRight (38)
-                                                :- * Project (36)
-                                                :  +- * BroadcastHashJoin Inner BuildRight (35)
-                                                :     :- * Filter (33)
-                                                :     :  +- * ColumnarToRow (32)
-                                                :     :     +- Scan parquet default.web_sales (31)
-                                                :     +- ReusedExchange (34)
-                                                +- ReusedExchange (37)
+* HashAggregate (55)
++- Exchange (54)
+   +- * HashAggregate (53)
+      +- * HashAggregate (52)
+         +- * HashAggregate (51)
+            +- * HashAggregate (50)
+               +- * HashAggregate (49)
+                  +- * HashAggregate (48)
+                     +- Exchange (47)
+                        +- * HashAggregate (46)
+                           +- * Project (45)
+                              +- * BroadcastHashJoin LeftSemi BuildRight (44)
+                                 :- * BroadcastHashJoin LeftSemi BuildRight (30)
+                                 :  :- * Project (16)
+                                 :  :  +- * BroadcastHashJoin Inner BuildRight (15)
+                                 :  :     :- * Project (10)
+                                 :  :     :  +- * BroadcastHashJoin Inner BuildRight (9)
+                                 :  :     :     :- * Filter (3)
+                                 :  :     :     :  +- * ColumnarToRow (2)
+                                 :  :     :     :     +- Scan parquet default.store_sales (1)
+                                 :  :     :     +- BroadcastExchange (8)
+                                 :  :     :        +- * Project (7)
+                                 :  :     :           +- * Filter (6)
+                                 :  :     :              +- * ColumnarToRow (5)
+                                 :  :     :                 +- Scan parquet default.date_dim (4)
+                                 :  :     +- BroadcastExchange (14)
+                                 :  :        +- * Filter (13)
+                                 :  :           +- * ColumnarToRow (12)
+                                 :  :              +- Scan parquet default.customer (11)
+                                 :  +- BroadcastExchange (29)
+                                 :     +- * HashAggregate (28)
+                                 :        +- Exchange (27)
+                                 :           +- * HashAggregate (26)
+                                 :              +- * Project (25)
+                                 :                 +- * BroadcastHashJoin Inner BuildRight (24)
+                                 :                    :- * Project (22)
+                                 :                    :  +- * BroadcastHashJoin Inner BuildRight (21)
+                                 :                    :     :- * Filter (19)
+                                 :                    :     :  +- * ColumnarToRow (18)
+                                 :                    :     :     +- Scan parquet default.catalog_sales (17)
+                                 :                    :     +- ReusedExchange (20)
+                                 :                    +- ReusedExchange (23)
+                                 +- BroadcastExchange (43)
+                                    +- * HashAggregate (42)
+                                       +- Exchange (41)
+                                          +- * HashAggregate (40)
+                                             +- * Project (39)
+                                                +- * BroadcastHashJoin Inner BuildRight (38)
+                                                   :- * Project (36)
+                                                   :  +- * BroadcastHashJoin Inner BuildRight (35)
+                                                   :     :- * Filter (33)
+                                                   :     :  +- * ColumnarToRow (32)
+                                                   :     :     +- Scan parquet default.web_sales (31)
+                                                   :     +- ReusedExchange (34)
+                                                   +- ReusedExchange (37)
 
 
 (1) Scan parquet default.store_sales
@@ -174,7 +175,7 @@ Results [3]: [c_last_name#17, c_first_name#16, d_date#14]
 
 (27) Exchange
 Input [3]: [c_last_name#17, c_first_name#16, d_date#14]
-Arguments: hashpartitioning(c_last_name#17, c_first_name#16, d_date#14, 5), true, [id=#18]
+Arguments: hashpartitioning(c_last_name#17, c_first_name#16, d_date#14, 5), ENSURE_REQUIREMENTS, [id=#18]
 
 (28) HashAggregate [codegen id : 6]
 Input [3]: [c_last_name#17, c_first_name#16, d_date#14]
@@ -239,7 +240,7 @@ Results [3]: [c_last_name#26, c_first_name#25, d_date#23]
 
 (41) Exchange
 Input [3]: [c_last_name#26, c_first_name#25, d_date#23]
-Arguments: hashpartitioning(c_last_name#26, c_first_name#25, d_date#23, 5), true, [id=#27]
+Arguments: hashpartitioning(c_last_name#26, c_first_name#25, d_date#23, 5), ENSURE_REQUIREMENTS, [id=#27]
 
 (42) HashAggregate [codegen id : 10]
 Input [3]: [c_last_name#26, c_first_name#25, d_date#23]
@@ -257,64 +258,68 @@ Left keys [6]: [coalesce(c_last_name#9, ), isnull(c_last_name#9), coalesce(c_fir
 Right keys [6]: [coalesce(c_last_name#26, ), isnull(c_last_name#26), coalesce(c_first_name#25, ), isnull(c_first_name#25), coalesce(d_date#23, 0), isnull(d_date#23)]
 Join condition: None
 
-(45) HashAggregate [codegen id : 11]
+(45) Project [codegen id : 11]
+Output [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Input [3]: [d_date#4, c_first_name#8, c_last_name#9]
+
+(46) HashAggregate [codegen id : 11]
+Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(46) Exchange
+(47) Exchange
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
-Arguments: hashpartitioning(c_last_name#9, c_first_name#8, d_date#4, 5), true, [id=#29]
+Arguments: hashpartitioning(c_last_name#9, c_first_name#8, d_date#4, 5), ENSURE_REQUIREMENTS, [id=#29]
 
-(47) HashAggregate [codegen id : 12]
+(48) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(48) HashAggregate [codegen id : 12]
+(49) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(49) HashAggregate [codegen id : 12]
+(50) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(50) HashAggregate [codegen id : 12]
+(51) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(51) HashAggregate [codegen id : 12]
+(52) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results: []
 
-(52) HashAggregate [codegen id : 12]
+(53) HashAggregate [codegen id : 12]
 Input: []
 Keys: []
 Functions [1]: [partial_count(1)]
 Aggregate Attributes [1]: [count#30]
 Results [1]: [count#31]
 
-(53) Exchange
+(54) Exchange
 Input [1]: [count#31]
-Arguments: SinglePartition, true, [id=#32]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
 
-(54) HashAggregate [codegen id : 13]
+(55) HashAggregate [codegen id : 13]
 Input [1]: [count#31]
 Keys: []
 Functions [1]: [count(1)]
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/simplified.txt
index a5b57a4..0f32bfb 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q38/simplified.txt
@@ -13,68 +13,69 @@ WholeStageCodegen (13)
                         Exchange [c_last_name,c_first_name,d_date] #2
                           WholeStageCodegen (11)
                             HashAggregate [c_last_name,c_first_name,d_date]
-                              BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                              Project [c_last_name,c_first_name,d_date]
                                 BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
-                                  Project [d_date,c_first_name,c_last_name]
-                                    BroadcastHashJoin [ss_customer_sk,c_customer_sk]
-                                      Project [ss_customer_sk,d_date]
-                                        BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
-                                          Filter [ss_sold_date_sk,ss_customer_sk]
-                                            ColumnarToRow
-                                              InputAdapter
-                                                Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
-                                          InputAdapter
-                                            BroadcastExchange #3
-                                              WholeStageCodegen (1)
-                                                Project [d_date_sk,d_date]
-                                                  Filter [d_month_seq,d_date_sk]
-                                                    ColumnarToRow
-                                                      InputAdapter
-                                                        Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
-                                      InputAdapter
-                                        BroadcastExchange #4
-                                          WholeStageCodegen (2)
-                                            Filter [c_customer_sk]
+                                  BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                                    Project [d_date,c_first_name,c_last_name]
+                                      BroadcastHashJoin [ss_customer_sk,c_customer_sk]
+                                        Project [ss_customer_sk,d_date]
+                                          BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
+                                            Filter [ss_sold_date_sk,ss_customer_sk]
                                               ColumnarToRow
                                                 InputAdapter
-                                                  Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                                  Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
+                                            InputAdapter
+                                              BroadcastExchange #3
+                                                WholeStageCodegen (1)
+                                                  Project [d_date_sk,d_date]
+                                                    Filter [d_month_seq,d_date_sk]
+                                                      ColumnarToRow
+                                                        InputAdapter
+                                                          Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
+                                        InputAdapter
+                                          BroadcastExchange #4
+                                            WholeStageCodegen (2)
+                                              Filter [c_customer_sk]
+                                                ColumnarToRow
+                                                  InputAdapter
+                                                    Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                    InputAdapter
+                                      BroadcastExchange #5
+                                        WholeStageCodegen (6)
+                                          HashAggregate [c_last_name,c_first_name,d_date]
+                                            InputAdapter
+                                              Exchange [c_last_name,c_first_name,d_date] #6
+                                                WholeStageCodegen (5)
+                                                  HashAggregate [c_last_name,c_first_name,d_date]
+                                                    Project [c_last_name,c_first_name,d_date]
+                                                      BroadcastHashJoin [cs_bill_customer_sk,c_customer_sk]
+                                                        Project [cs_bill_customer_sk,d_date]
+                                                          BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
+                                                            Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                              ColumnarToRow
+                                                                InputAdapter
+                                                                  Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                            InputAdapter
+                                                              ReusedExchange [d_date_sk,d_date] #3
+                                                        InputAdapter
+                                                          ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
                                   InputAdapter
-                                    BroadcastExchange #5
-                                      WholeStageCodegen (6)
+                                    BroadcastExchange #7
+                                      WholeStageCodegen (10)
                                         HashAggregate [c_last_name,c_first_name,d_date]
                                           InputAdapter
-                                            Exchange [c_last_name,c_first_name,d_date] #6
-                                              WholeStageCodegen (5)
+                                            Exchange [c_last_name,c_first_name,d_date] #8
+                                              WholeStageCodegen (9)
                                                 HashAggregate [c_last_name,c_first_name,d_date]
                                                   Project [c_last_name,c_first_name,d_date]
-                                                    BroadcastHashJoin [cs_bill_customer_sk,c_customer_sk]
-                                                      Project [cs_bill_customer_sk,d_date]
-                                                        BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
-                                                          Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                    BroadcastHashJoin [ws_bill_customer_sk,c_customer_sk]
+                                                      Project [ws_bill_customer_sk,d_date]
+                                                        BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
+                                                          Filter [ws_sold_date_sk,ws_bill_customer_sk]
                                                             ColumnarToRow
                                                               InputAdapter
-                                                                Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
                                                           InputAdapter
                                                             ReusedExchange [d_date_sk,d_date] #3
                                                       InputAdapter
                                                         ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
-                                InputAdapter
-                                  BroadcastExchange #7
-                                    WholeStageCodegen (10)
-                                      HashAggregate [c_last_name,c_first_name,d_date]
-                                        InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #8
-                                            WholeStageCodegen (9)
-                                              HashAggregate [c_last_name,c_first_name,d_date]
-                                                Project [c_last_name,c_first_name,d_date]
-                                                  BroadcastHashJoin [ws_bill_customer_sk,c_customer_sk]
-                                                    Project [ws_bill_customer_sk,d_date]
-                                                      BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
-                                                        Filter [ws_sold_date_sk,ws_bill_customer_sk]
-                                                          ColumnarToRow
-                                                            InputAdapter
-                                                              Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
-                                                        InputAdapter
-                                                          ReusedExchange [d_date_sk,d_date] #3
-                                                    InputAdapter
-                                                      ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt
index 377bd36..e635f02 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/explain.txt
@@ -1,71 +1,72 @@
 == Physical Plan ==
-* HashAggregate (67)
-+- Exchange (66)
-   +- * HashAggregate (65)
-      +- * HashAggregate (64)
-         +- * HashAggregate (63)
-            +- * HashAggregate (62)
-               +- * HashAggregate (61)
-                  +- * HashAggregate (60)
-                     +- Exchange (59)
-                        +- * HashAggregate (58)
-                           +- SortMergeJoin LeftAnti (57)
-                              :- SortMergeJoin LeftAnti (39)
-                              :  :- * Sort (21)
-                              :  :  +- Exchange (20)
-                              :  :     +- * Project (19)
-                              :  :        +- * SortMergeJoin Inner (18)
-                              :  :           :- * Sort (12)
-                              :  :           :  +- Exchange (11)
-                              :  :           :     +- * Project (10)
-                              :  :           :        +- * BroadcastHashJoin Inner BuildRight (9)
-                              :  :           :           :- * Filter (3)
-                              :  :           :           :  +- * ColumnarToRow (2)
-                              :  :           :           :     +- Scan parquet default.store_sales (1)
-                              :  :           :           +- BroadcastExchange (8)
-                              :  :           :              +- * Project (7)
-                              :  :           :                 +- * Filter (6)
-                              :  :           :                    +- * ColumnarToRow (5)
-                              :  :           :                       +- Scan parquet default.date_dim (4)
-                              :  :           +- * Sort (17)
-                              :  :              +- Exchange (16)
-                              :  :                 +- * Filter (15)
-                              :  :                    +- * ColumnarToRow (14)
-                              :  :                       +- Scan parquet default.customer (13)
-                              :  +- * Sort (38)
-                              :     +- Exchange (37)
-                              :        +- * HashAggregate (36)
-                              :           +- Exchange (35)
-                              :              +- * HashAggregate (34)
-                              :                 +- * Project (33)
-                              :                    +- * SortMergeJoin Inner (32)
-                              :                       :- * Sort (29)
-                              :                       :  +- Exchange (28)
-                              :                       :     +- * Project (27)
-                              :                       :        +- * BroadcastHashJoin Inner BuildRight (26)
-                              :                       :           :- * Filter (24)
-                              :                       :           :  +- * ColumnarToRow (23)
-                              :                       :           :     +- Scan parquet default.catalog_sales (22)
-                              :                       :           +- ReusedExchange (25)
-                              :                       +- * Sort (31)
-                              :                          +- ReusedExchange (30)
-                              +- * Sort (56)
-                                 +- Exchange (55)
-                                    +- * HashAggregate (54)
-                                       +- Exchange (53)
-                                          +- * HashAggregate (52)
-                                             +- * Project (51)
-                                                +- * SortMergeJoin Inner (50)
-                                                   :- * Sort (47)
-                                                   :  +- Exchange (46)
-                                                   :     +- * Project (45)
-                                                   :        +- * BroadcastHashJoin Inner BuildRight (44)
-                                                   :           :- * Filter (42)
-                                                   :           :  +- * ColumnarToRow (41)
-                                                   :           :     +- Scan parquet default.web_sales (40)
-                                                   :           +- ReusedExchange (43)
-                                                   +- * Sort (49)
-                                                      +- ReusedExchange (48)
+* HashAggregate (68)
++- Exchange (67)
+   +- * HashAggregate (66)
+      +- * HashAggregate (65)
+         +- * HashAggregate (64)
+            +- * HashAggregate (63)
+               +- * HashAggregate (62)
+                  +- * HashAggregate (61)
+                     +- Exchange (60)
+                        +- * HashAggregate (59)
+                           +- * Project (58)
+                              +- SortMergeJoin LeftAnti (57)
+                                 :- SortMergeJoin LeftAnti (39)
+                                 :  :- * Sort (21)
+                                 :  :  +- Exchange (20)
+                                 :  :     +- * Project (19)
+                                 :  :        +- * SortMergeJoin Inner (18)
+                                 :  :           :- * Sort (12)
+                                 :  :           :  +- Exchange (11)
+                                 :  :           :     +- * Project (10)
+                                 :  :           :        +- * BroadcastHashJoin Inner BuildRight (9)
+                                 :  :           :           :- * Filter (3)
+                                 :  :           :           :  +- * ColumnarToRow (2)
+                                 :  :           :           :     +- Scan parquet default.store_sales (1)
+                                 :  :           :           +- BroadcastExchange (8)
+                                 :  :           :              +- * Project (7)
+                                 :  :           :                 +- * Filter (6)
+                                 :  :           :                    +- * ColumnarToRow (5)
+                                 :  :           :                       +- Scan parquet default.date_dim (4)
+                                 :  :           +- * Sort (17)
+                                 :  :              +- Exchange (16)
+                                 :  :                 +- * Filter (15)
+                                 :  :                    +- * ColumnarToRow (14)
+                                 :  :                       +- Scan parquet default.customer (13)
+                                 :  +- * Sort (38)
+                                 :     +- Exchange (37)
+                                 :        +- * HashAggregate (36)
+                                 :           +- Exchange (35)
+                                 :              +- * HashAggregate (34)
+                                 :                 +- * Project (33)
+                                 :                    +- * SortMergeJoin Inner (32)
+                                 :                       :- * Sort (29)
+                                 :                       :  +- Exchange (28)
+                                 :                       :     +- * Project (27)
+                                 :                       :        +- * BroadcastHashJoin Inner BuildRight (26)
+                                 :                       :           :- * Filter (24)
+                                 :                       :           :  +- * ColumnarToRow (23)
+                                 :                       :           :     +- Scan parquet default.catalog_sales (22)
+                                 :                       :           +- ReusedExchange (25)
+                                 :                       +- * Sort (31)
+                                 :                          +- ReusedExchange (30)
+                                 +- * Sort (56)
+                                    +- Exchange (55)
+                                       +- * HashAggregate (54)
+                                          +- Exchange (53)
+                                             +- * HashAggregate (52)
+                                                +- * Project (51)
+                                                   +- * SortMergeJoin Inner (50)
+                                                      :- * Sort (47)
+                                                      :  +- Exchange (46)
+                                                      :     +- * Project (45)
+                                                      :        +- * BroadcastHashJoin Inner BuildRight (44)
+                                                      :           :- * Filter (42)
+                                                      :           :  +- * ColumnarToRow (41)
+                                                      :           :     +- Scan parquet default.web_sales (40)
+                                                      :           +- ReusedExchange (43)
+                                                      +- * Sort (49)
+                                                         +- ReusedExchange (48)
 
 
 (1) Scan parquet default.store_sales
@@ -115,7 +116,7 @@ Input [4]: [ss_sold_date_sk#1, ss_customer_sk#2, d_date_sk#3, d_date#4]
 
 (11) Exchange
 Input [2]: [ss_customer_sk#2, d_date#4]
-Arguments: hashpartitioning(ss_customer_sk#2, 5), true, [id=#7]
+Arguments: hashpartitioning(ss_customer_sk#2, 5), ENSURE_REQUIREMENTS, [id=#7]
 
 (12) Sort [codegen id : 3]
 Input [2]: [ss_customer_sk#2, d_date#4]
@@ -137,7 +138,7 @@ Condition : isnotnull(c_customer_sk#8)
 
 (16) Exchange
 Input [3]: [c_customer_sk#8, c_first_name#9, c_last_name#10]
-Arguments: hashpartitioning(c_customer_sk#8, 5), true, [id=#11]
+Arguments: hashpartitioning(c_customer_sk#8, 5), ENSURE_REQUIREMENTS, [id=#11]
 
 (17) Sort [codegen id : 5]
 Input [3]: [c_customer_sk#8, c_first_name#9, c_last_name#10]
@@ -154,7 +155,7 @@ Input [5]: [ss_customer_sk#2, d_date#4, c_customer_sk#8, c_first_name#9, c_last_
 
 (20) Exchange
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
-Arguments: hashpartitioning(coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_first_name#9, ), isnull(c_first_name#9), coalesce(d_date#4, 0), isnull(d_date#4), 5), true, [id=#12]
+Arguments: hashpartitioning(coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_first_name#9, ), isnull(c_first_name#9), coalesce(d_date#4, 0), isnull(d_date#4), 5), ENSURE_REQUIREMENTS, [id=#12]
 
 (21) Sort [codegen id : 7]
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
@@ -188,7 +189,7 @@ Input [4]: [cs_sold_date_sk#13, cs_bill_customer_sk#14, d_date_sk#15, d_date#16]
 
 (28) Exchange
 Input [2]: [cs_bill_customer_sk#14, d_date#16]
-Arguments: hashpartitioning(cs_bill_customer_sk#14, 5), true, [id=#17]
+Arguments: hashpartitioning(cs_bill_customer_sk#14, 5), ENSURE_REQUIREMENTS, [id=#17]
 
 (29) Sort [codegen id : 10]
 Input [2]: [cs_bill_customer_sk#14, d_date#16]
@@ -219,7 +220,7 @@ Results [3]: [c_last_name#20, c_first_name#19, d_date#16]
 
 (35) Exchange
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
-Arguments: hashpartitioning(c_last_name#20, c_first_name#19, d_date#16, 5), true, [id=#21]
+Arguments: hashpartitioning(c_last_name#20, c_first_name#19, d_date#16, 5), ENSURE_REQUIREMENTS, [id=#21]
 
 (36) HashAggregate [codegen id : 14]
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
@@ -230,7 +231,7 @@ Results [3]: [c_last_name#20, c_first_name#19, d_date#16]
 
 (37) Exchange
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
-Arguments: hashpartitioning(coalesce(c_last_name#20, ), isnull(c_last_name#20), coalesce(c_first_name#19, ), isnull(c_first_name#19), coalesce(d_date#16, 0), isnull(d_date#16), 5), true, [id=#22]
+Arguments: hashpartitioning(coalesce(c_last_name#20, ), isnull(c_last_name#20), coalesce(c_first_name#19, ), isnull(c_first_name#19), coalesce(d_date#16, 0), isnull(d_date#16), 5), ENSURE_REQUIREMENTS, [id=#22]
 
 (38) Sort [codegen id : 15]
 Input [3]: [c_last_name#20, c_first_name#19, d_date#16]
@@ -269,7 +270,7 @@ Input [4]: [ws_sold_date_sk#23, ws_bill_customer_sk#24, d_date_sk#25, d_date#26]
 
 (46) Exchange
 Input [2]: [ws_bill_customer_sk#24, d_date#26]
-Arguments: hashpartitioning(ws_bill_customer_sk#24, 5), true, [id=#27]
+Arguments: hashpartitioning(ws_bill_customer_sk#24, 5), ENSURE_REQUIREMENTS, [id=#27]
 
 (47) Sort [codegen id : 18]
 Input [2]: [ws_bill_customer_sk#24, d_date#26]
@@ -300,7 +301,7 @@ Results [3]: [c_last_name#30, c_first_name#29, d_date#26]
 
 (53) Exchange
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
-Arguments: hashpartitioning(c_last_name#30, c_first_name#29, d_date#26, 5), true, [id=#31]
+Arguments: hashpartitioning(c_last_name#30, c_first_name#29, d_date#26, 5), ENSURE_REQUIREMENTS, [id=#31]
 
 (54) HashAggregate [codegen id : 22]
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
@@ -311,7 +312,7 @@ Results [3]: [c_last_name#30, c_first_name#29, d_date#26]
 
 (55) Exchange
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
-Arguments: hashpartitioning(coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26), 5), true, [id=#32]
+Arguments: hashpartitioning(coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26), 5), ENSURE_REQUIREMENTS, [id=#32]
 
 (56) Sort [codegen id : 23]
 Input [3]: [c_last_name#30, c_first_name#29, d_date#26]
@@ -322,64 +323,68 @@ Left keys [6]: [coalesce(c_last_name#10, ), isnull(c_last_name#10), coalesce(c_f
 Right keys [6]: [coalesce(c_last_name#30, ), isnull(c_last_name#30), coalesce(c_first_name#29, ), isnull(c_first_name#29), coalesce(d_date#26, 0), isnull(d_date#26)]
 Join condition: None
 
-(58) HashAggregate [codegen id : 24]
+(58) Project [codegen id : 24]
+Output [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Input [3]: [d_date#4, c_first_name#9, c_last_name#10]
+
+(59) HashAggregate [codegen id : 24]
+Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(59) Exchange
+(60) Exchange
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
-Arguments: hashpartitioning(c_last_name#10, c_first_name#9, d_date#4, 5), true, [id=#33]
+Arguments: hashpartitioning(c_last_name#10, c_first_name#9, d_date#4, 5), ENSURE_REQUIREMENTS, [id=#33]
 
-(60) HashAggregate [codegen id : 25]
+(61) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(61) HashAggregate [codegen id : 25]
+(62) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(62) HashAggregate [codegen id : 25]
+(63) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(63) HashAggregate [codegen id : 25]
+(64) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#10, c_first_name#9, d_date#4]
 
-(64) HashAggregate [codegen id : 25]
+(65) HashAggregate [codegen id : 25]
 Input [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Keys [3]: [c_last_name#10, c_first_name#9, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results: []
 
-(65) HashAggregate [codegen id : 25]
+(66) HashAggregate [codegen id : 25]
 Input: []
 Keys: []
 Functions [1]: [partial_count(1)]
 Aggregate Attributes [1]: [count#34]
 Results [1]: [count#35]
 
-(66) Exchange
+(67) Exchange
 Input [1]: [count#35]
-Arguments: SinglePartition, true, [id=#36]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#36]
 
-(67) HashAggregate [codegen id : 26]
+(68) HashAggregate [codegen id : 26]
 Input [1]: [count#35]
 Keys: []
 Functions [1]: [count(1)]
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt
index 8dd5934..015d3c5 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87.sf100/simplified.txt
@@ -13,105 +13,106 @@ WholeStageCodegen (26)
                         Exchange [c_last_name,c_first_name,d_date] #2
                           WholeStageCodegen (24)
                             HashAggregate [c_last_name,c_first_name,d_date]
-                              InputAdapter
-                                SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                              Project [c_last_name,c_first_name,d_date]
+                                InputAdapter
                                   SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
-                                    WholeStageCodegen (7)
-                                      Sort [c_last_name,c_first_name,d_date]
-                                        InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #3
-                                            WholeStageCodegen (6)
-                                              Project [d_date,c_first_name,c_last_name]
-                                                SortMergeJoin [ss_customer_sk,c_customer_sk]
+                                    SortMergeJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                                      WholeStageCodegen (7)
+                                        Sort [c_last_name,c_first_name,d_date]
+                                          InputAdapter
+                                            Exchange [c_last_name,c_first_name,d_date] #3
+                                              WholeStageCodegen (6)
+                                                Project [d_date,c_first_name,c_last_name]
+                                                  SortMergeJoin [ss_customer_sk,c_customer_sk]
+                                                    InputAdapter
+                                                      WholeStageCodegen (3)
+                                                        Sort [ss_customer_sk]
+                                                          InputAdapter
+                                                            Exchange [ss_customer_sk] #4
+                                                              WholeStageCodegen (2)
+                                                                Project [ss_customer_sk,d_date]
+                                                                  BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
+                                                                    Filter [ss_sold_date_sk,ss_customer_sk]
+                                                                      ColumnarToRow
+                                                                        InputAdapter
+                                                                          Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
+                                                                    InputAdapter
+                                                                      BroadcastExchange #5
+                                                                        WholeStageCodegen (1)
+                                                                          Project [d_date_sk,d_date]
+                                                                            Filter [d_month_seq,d_date_sk]
+                                                                              ColumnarToRow
+                                                                                InputAdapter
+                                                                                  Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
+                                                    InputAdapter
+                                                      WholeStageCodegen (5)
+                                                        Sort [c_customer_sk]
+                                                          InputAdapter
+                                                            Exchange [c_customer_sk] #6
+                                                              WholeStageCodegen (4)
+                                                                Filter [c_customer_sk]
+                                                                  ColumnarToRow
+                                                                    InputAdapter
+                                                                      Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                      WholeStageCodegen (15)
+                                        Sort [c_last_name,c_first_name,d_date]
+                                          InputAdapter
+                                            Exchange [c_last_name,c_first_name,d_date] #7
+                                              WholeStageCodegen (14)
+                                                HashAggregate [c_last_name,c_first_name,d_date]
                                                   InputAdapter
-                                                    WholeStageCodegen (3)
-                                                      Sort [ss_customer_sk]
-                                                        InputAdapter
-                                                          Exchange [ss_customer_sk] #4
-                                                            WholeStageCodegen (2)
-                                                              Project [ss_customer_sk,d_date]
-                                                                BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
-                                                                  Filter [ss_sold_date_sk,ss_customer_sk]
-                                                                    ColumnarToRow
-                                                                      InputAdapter
-                                                                        Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
-                                                                  InputAdapter
-                                                                    BroadcastExchange #5
-                                                                      WholeStageCodegen (1)
-                                                                        Project [d_date_sk,d_date]
-                                                                          Filter [d_month_seq,d_date_sk]
-                                                                            ColumnarToRow
+                                                    Exchange [c_last_name,c_first_name,d_date] #8
+                                                      WholeStageCodegen (13)
+                                                        HashAggregate [c_last_name,c_first_name,d_date]
+                                                          Project [c_last_name,c_first_name,d_date]
+                                                            SortMergeJoin [cs_bill_customer_sk,c_customer_sk]
+                                                              InputAdapter
+                                                                WholeStageCodegen (10)
+                                                                  Sort [cs_bill_customer_sk]
+                                                                    InputAdapter
+                                                                      Exchange [cs_bill_customer_sk] #9
+                                                                        WholeStageCodegen (9)
+                                                                          Project [cs_bill_customer_sk,d_date]
+                                                                            BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
+                                                                              Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                                ColumnarToRow
+                                                                                  InputAdapter
+                                                                                    Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
                                                                               InputAdapter
-                                                                                Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
-                                                  InputAdapter
-                                                    WholeStageCodegen (5)
-                                                      Sort [c_customer_sk]
-                                                        InputAdapter
-                                                          Exchange [c_customer_sk] #6
-                                                            WholeStageCodegen (4)
-                                                              Filter [c_customer_sk]
-                                                                ColumnarToRow
-                                                                  InputAdapter
-                                                                    Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
-                                    WholeStageCodegen (15)
+                                                                                ReusedExchange [d_date_sk,d_date] #5
+                                                              InputAdapter
+                                                                WholeStageCodegen (12)
+                                                                  Sort [c_customer_sk]
+                                                                    InputAdapter
+                                                                      ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
+                                    WholeStageCodegen (23)
                                       Sort [c_last_name,c_first_name,d_date]
                                         InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #7
-                                            WholeStageCodegen (14)
+                                          Exchange [c_last_name,c_first_name,d_date] #10
+                                            WholeStageCodegen (22)
                                               HashAggregate [c_last_name,c_first_name,d_date]
                                                 InputAdapter
-                                                  Exchange [c_last_name,c_first_name,d_date] #8
-                                                    WholeStageCodegen (13)
+                                                  Exchange [c_last_name,c_first_name,d_date] #11
+                                                    WholeStageCodegen (21)
                                                       HashAggregate [c_last_name,c_first_name,d_date]
                                                         Project [c_last_name,c_first_name,d_date]
-                                                          SortMergeJoin [cs_bill_customer_sk,c_customer_sk]
+                                                          SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
                                                             InputAdapter
-                                                              WholeStageCodegen (10)
-                                                                Sort [cs_bill_customer_sk]
+                                                              WholeStageCodegen (18)
+                                                                Sort [ws_bill_customer_sk]
                                                                   InputAdapter
-                                                                    Exchange [cs_bill_customer_sk] #9
-                                                                      WholeStageCodegen (9)
-                                                                        Project [cs_bill_customer_sk,d_date]
-                                                                          BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
-                                                                            Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                    Exchange [ws_bill_customer_sk] #12
+                                                                      WholeStageCodegen (17)
+                                                                        Project [ws_bill_customer_sk,d_date]
+                                                                          BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
+                                                                            Filter [ws_sold_date_sk,ws_bill_customer_sk]
                                                                               ColumnarToRow
                                                                                 InputAdapter
-                                                                                  Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                                  Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
                                                                             InputAdapter
                                                                               ReusedExchange [d_date_sk,d_date] #5
                                                             InputAdapter
-                                                              WholeStageCodegen (12)
+                                                              WholeStageCodegen (20)
                                                                 Sort [c_customer_sk]
                                                                   InputAdapter
                                                                     ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
-                                  WholeStageCodegen (23)
-                                    Sort [c_last_name,c_first_name,d_date]
-                                      InputAdapter
-                                        Exchange [c_last_name,c_first_name,d_date] #10
-                                          WholeStageCodegen (22)
-                                            HashAggregate [c_last_name,c_first_name,d_date]
-                                              InputAdapter
-                                                Exchange [c_last_name,c_first_name,d_date] #11
-                                                  WholeStageCodegen (21)
-                                                    HashAggregate [c_last_name,c_first_name,d_date]
-                                                      Project [c_last_name,c_first_name,d_date]
-                                                        SortMergeJoin [ws_bill_customer_sk,c_customer_sk]
-                                                          InputAdapter
-                                                            WholeStageCodegen (18)
-                                                              Sort [ws_bill_customer_sk]
-                                                                InputAdapter
-                                                                  Exchange [ws_bill_customer_sk] #12
-                                                                    WholeStageCodegen (17)
-                                                                      Project [ws_bill_customer_sk,d_date]
-                                                                        BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
-                                                                          Filter [ws_sold_date_sk,ws_bill_customer_sk]
-                                                                            ColumnarToRow
-                                                                              InputAdapter
-                                                                                Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
-                                                                          InputAdapter
-                                                                            ReusedExchange [d_date_sk,d_date] #5
-                                                          InputAdapter
-                                                            WholeStageCodegen (20)
-                                                              Sort [c_customer_sk]
-                                                                InputAdapter
-                                                                  ReusedExchange [c_customer_sk,c_first_name,c_last_name] #6
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/explain.txt
index 3d59a67..3f52e6d 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/explain.txt
@@ -1,58 +1,59 @@
 == Physical Plan ==
-* HashAggregate (54)
-+- Exchange (53)
-   +- * HashAggregate (52)
-      +- * HashAggregate (51)
-         +- * HashAggregate (50)
-            +- * HashAggregate (49)
-               +- * HashAggregate (48)
-                  +- * HashAggregate (47)
-                     +- Exchange (46)
-                        +- * HashAggregate (45)
-                           +- * BroadcastHashJoin LeftAnti BuildRight (44)
-                              :- * BroadcastHashJoin LeftAnti BuildRight (30)
-                              :  :- * Project (16)
-                              :  :  +- * BroadcastHashJoin Inner BuildRight (15)
-                              :  :     :- * Project (10)
-                              :  :     :  +- * BroadcastHashJoin Inner BuildRight (9)
-                              :  :     :     :- * Filter (3)
-                              :  :     :     :  +- * ColumnarToRow (2)
-                              :  :     :     :     +- Scan parquet default.store_sales (1)
-                              :  :     :     +- BroadcastExchange (8)
-                              :  :     :        +- * Project (7)
-                              :  :     :           +- * Filter (6)
-                              :  :     :              +- * ColumnarToRow (5)
-                              :  :     :                 +- Scan parquet default.date_dim (4)
-                              :  :     +- BroadcastExchange (14)
-                              :  :        +- * Filter (13)
-                              :  :           +- * ColumnarToRow (12)
-                              :  :              +- Scan parquet default.customer (11)
-                              :  +- BroadcastExchange (29)
-                              :     +- * HashAggregate (28)
-                              :        +- Exchange (27)
-                              :           +- * HashAggregate (26)
-                              :              +- * Project (25)
-                              :                 +- * BroadcastHashJoin Inner BuildRight (24)
-                              :                    :- * Project (22)
-                              :                    :  +- * BroadcastHashJoin Inner BuildRight (21)
-                              :                    :     :- * Filter (19)
-                              :                    :     :  +- * ColumnarToRow (18)
-                              :                    :     :     +- Scan parquet default.catalog_sales (17)
-                              :                    :     +- ReusedExchange (20)
-                              :                    +- ReusedExchange (23)
-                              +- BroadcastExchange (43)
-                                 +- * HashAggregate (42)
-                                    +- Exchange (41)
-                                       +- * HashAggregate (40)
-                                          +- * Project (39)
-                                             +- * BroadcastHashJoin Inner BuildRight (38)
-                                                :- * Project (36)
-                                                :  +- * BroadcastHashJoin Inner BuildRight (35)
-                                                :     :- * Filter (33)
-                                                :     :  +- * ColumnarToRow (32)
-                                                :     :     +- Scan parquet default.web_sales (31)
-                                                :     +- ReusedExchange (34)
-                                                +- ReusedExchange (37)
+* HashAggregate (55)
++- Exchange (54)
+   +- * HashAggregate (53)
+      +- * HashAggregate (52)
+         +- * HashAggregate (51)
+            +- * HashAggregate (50)
+               +- * HashAggregate (49)
+                  +- * HashAggregate (48)
+                     +- Exchange (47)
+                        +- * HashAggregate (46)
+                           +- * Project (45)
+                              +- * BroadcastHashJoin LeftAnti BuildRight (44)
+                                 :- * BroadcastHashJoin LeftAnti BuildRight (30)
+                                 :  :- * Project (16)
+                                 :  :  +- * BroadcastHashJoin Inner BuildRight (15)
+                                 :  :     :- * Project (10)
+                                 :  :     :  +- * BroadcastHashJoin Inner BuildRight (9)
+                                 :  :     :     :- * Filter (3)
+                                 :  :     :     :  +- * ColumnarToRow (2)
+                                 :  :     :     :     +- Scan parquet default.store_sales (1)
+                                 :  :     :     +- BroadcastExchange (8)
+                                 :  :     :        +- * Project (7)
+                                 :  :     :           +- * Filter (6)
+                                 :  :     :              +- * ColumnarToRow (5)
+                                 :  :     :                 +- Scan parquet default.date_dim (4)
+                                 :  :     +- BroadcastExchange (14)
+                                 :  :        +- * Filter (13)
+                                 :  :           +- * ColumnarToRow (12)
+                                 :  :              +- Scan parquet default.customer (11)
+                                 :  +- BroadcastExchange (29)
+                                 :     +- * HashAggregate (28)
+                                 :        +- Exchange (27)
+                                 :           +- * HashAggregate (26)
+                                 :              +- * Project (25)
+                                 :                 +- * BroadcastHashJoin Inner BuildRight (24)
+                                 :                    :- * Project (22)
+                                 :                    :  +- * BroadcastHashJoin Inner BuildRight (21)
+                                 :                    :     :- * Filter (19)
+                                 :                    :     :  +- * ColumnarToRow (18)
+                                 :                    :     :     +- Scan parquet default.catalog_sales (17)
+                                 :                    :     +- ReusedExchange (20)
+                                 :                    +- ReusedExchange (23)
+                                 +- BroadcastExchange (43)
+                                    +- * HashAggregate (42)
+                                       +- Exchange (41)
+                                          +- * HashAggregate (40)
+                                             +- * Project (39)
+                                                +- * BroadcastHashJoin Inner BuildRight (38)
+                                                   :- * Project (36)
+                                                   :  +- * BroadcastHashJoin Inner BuildRight (35)
+                                                   :     :- * Filter (33)
+                                                   :     :  +- * ColumnarToRow (32)
+                                                   :     :     +- Scan parquet default.web_sales (31)
+                                                   :     +- ReusedExchange (34)
+                                                   +- ReusedExchange (37)
 
 
 (1) Scan parquet default.store_sales
@@ -174,7 +175,7 @@ Results [3]: [c_last_name#17, c_first_name#16, d_date#14]
 
 (27) Exchange
 Input [3]: [c_last_name#17, c_first_name#16, d_date#14]
-Arguments: hashpartitioning(c_last_name#17, c_first_name#16, d_date#14, 5), true, [id=#18]
+Arguments: hashpartitioning(c_last_name#17, c_first_name#16, d_date#14, 5), ENSURE_REQUIREMENTS, [id=#18]
 
 (28) HashAggregate [codegen id : 6]
 Input [3]: [c_last_name#17, c_first_name#16, d_date#14]
@@ -239,7 +240,7 @@ Results [3]: [c_last_name#26, c_first_name#25, d_date#23]
 
 (41) Exchange
 Input [3]: [c_last_name#26, c_first_name#25, d_date#23]
-Arguments: hashpartitioning(c_last_name#26, c_first_name#25, d_date#23, 5), true, [id=#27]
+Arguments: hashpartitioning(c_last_name#26, c_first_name#25, d_date#23, 5), ENSURE_REQUIREMENTS, [id=#27]
 
 (42) HashAggregate [codegen id : 10]
 Input [3]: [c_last_name#26, c_first_name#25, d_date#23]
@@ -257,64 +258,68 @@ Left keys [6]: [coalesce(c_last_name#9, ), isnull(c_last_name#9), coalesce(c_fir
 Right keys [6]: [coalesce(c_last_name#26, ), isnull(c_last_name#26), coalesce(c_first_name#25, ), isnull(c_first_name#25), coalesce(d_date#23, 0), isnull(d_date#23)]
 Join condition: None
 
-(45) HashAggregate [codegen id : 11]
+(45) Project [codegen id : 11]
+Output [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Input [3]: [d_date#4, c_first_name#8, c_last_name#9]
+
+(46) HashAggregate [codegen id : 11]
+Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(46) Exchange
+(47) Exchange
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
-Arguments: hashpartitioning(c_last_name#9, c_first_name#8, d_date#4, 5), true, [id=#29]
+Arguments: hashpartitioning(c_last_name#9, c_first_name#8, d_date#4, 5), ENSURE_REQUIREMENTS, [id=#29]
 
-(47) HashAggregate [codegen id : 12]
+(48) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(48) HashAggregate [codegen id : 12]
+(49) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(49) HashAggregate [codegen id : 12]
+(50) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(50) HashAggregate [codegen id : 12]
+(51) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results [3]: [c_last_name#9, c_first_name#8, d_date#4]
 
-(51) HashAggregate [codegen id : 12]
+(52) HashAggregate [codegen id : 12]
 Input [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Keys [3]: [c_last_name#9, c_first_name#8, d_date#4]
 Functions: []
 Aggregate Attributes: []
 Results: []
 
-(52) HashAggregate [codegen id : 12]
+(53) HashAggregate [codegen id : 12]
 Input: []
 Keys: []
 Functions [1]: [partial_count(1)]
 Aggregate Attributes [1]: [count#30]
 Results [1]: [count#31]
 
-(53) Exchange
+(54) Exchange
 Input [1]: [count#31]
-Arguments: SinglePartition, true, [id=#32]
+Arguments: SinglePartition, ENSURE_REQUIREMENTS, [id=#32]
 
-(54) HashAggregate [codegen id : 13]
+(55) HashAggregate [codegen id : 13]
 Input [1]: [count#31]
 Keys: []
 Functions [1]: [count(1)]
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/simplified.txt
index a5b57a4..0f32bfb 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q87/simplified.txt
@@ -13,68 +13,69 @@ WholeStageCodegen (13)
                         Exchange [c_last_name,c_first_name,d_date] #2
                           WholeStageCodegen (11)
                             HashAggregate [c_last_name,c_first_name,d_date]
-                              BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                              Project [c_last_name,c_first_name,d_date]
                                 BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
-                                  Project [d_date,c_first_name,c_last_name]
-                                    BroadcastHashJoin [ss_customer_sk,c_customer_sk]
-                                      Project [ss_customer_sk,d_date]
-                                        BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
-                                          Filter [ss_sold_date_sk,ss_customer_sk]
-                                            ColumnarToRow
-                                              InputAdapter
-                                                Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
-                                          InputAdapter
-                                            BroadcastExchange #3
-                                              WholeStageCodegen (1)
-                                                Project [d_date_sk,d_date]
-                                                  Filter [d_month_seq,d_date_sk]
-                                                    ColumnarToRow
-                                                      InputAdapter
-                                                        Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
-                                      InputAdapter
-                                        BroadcastExchange #4
-                                          WholeStageCodegen (2)
-                                            Filter [c_customer_sk]
+                                  BroadcastHashJoin [c_last_name,c_first_name,d_date,c_last_name,c_first_name,d_date]
+                                    Project [d_date,c_first_name,c_last_name]
+                                      BroadcastHashJoin [ss_customer_sk,c_customer_sk]
+                                        Project [ss_customer_sk,d_date]
+                                          BroadcastHashJoin [ss_sold_date_sk,d_date_sk]
+                                            Filter [ss_sold_date_sk,ss_customer_sk]
                                               ColumnarToRow
                                                 InputAdapter
-                                                  Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                                  Scan parquet default.store_sales [ss_sold_date_sk,ss_customer_sk]
+                                            InputAdapter
+                                              BroadcastExchange #3
+                                                WholeStageCodegen (1)
+                                                  Project [d_date_sk,d_date]
+                                                    Filter [d_month_seq,d_date_sk]
+                                                      ColumnarToRow
+                                                        InputAdapter
+                                                          Scan parquet default.date_dim [d_date_sk,d_date,d_month_seq]
+                                        InputAdapter
+                                          BroadcastExchange #4
+                                            WholeStageCodegen (2)
+                                              Filter [c_customer_sk]
+                                                ColumnarToRow
+                                                  InputAdapter
+                                                    Scan parquet default.customer [c_customer_sk,c_first_name,c_last_name]
+                                    InputAdapter
+                                      BroadcastExchange #5
+                                        WholeStageCodegen (6)
+                                          HashAggregate [c_last_name,c_first_name,d_date]
+                                            InputAdapter
+                                              Exchange [c_last_name,c_first_name,d_date] #6
+                                                WholeStageCodegen (5)
+                                                  HashAggregate [c_last_name,c_first_name,d_date]
+                                                    Project [c_last_name,c_first_name,d_date]
+                                                      BroadcastHashJoin [cs_bill_customer_sk,c_customer_sk]
+                                                        Project [cs_bill_customer_sk,d_date]
+                                                          BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
+                                                            Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                              ColumnarToRow
+                                                                InputAdapter
+                                                                  Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                            InputAdapter
+                                                              ReusedExchange [d_date_sk,d_date] #3
+                                                        InputAdapter
+                                                          ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
                                   InputAdapter
-                                    BroadcastExchange #5
-                                      WholeStageCodegen (6)
+                                    BroadcastExchange #7
+                                      WholeStageCodegen (10)
                                         HashAggregate [c_last_name,c_first_name,d_date]
                                           InputAdapter
-                                            Exchange [c_last_name,c_first_name,d_date] #6
-                                              WholeStageCodegen (5)
+                                            Exchange [c_last_name,c_first_name,d_date] #8
+                                              WholeStageCodegen (9)
                                                 HashAggregate [c_last_name,c_first_name,d_date]
                                                   Project [c_last_name,c_first_name,d_date]
-                                                    BroadcastHashJoin [cs_bill_customer_sk,c_customer_sk]
-                                                      Project [cs_bill_customer_sk,d_date]
-                                                        BroadcastHashJoin [cs_sold_date_sk,d_date_sk]
-                                                          Filter [cs_sold_date_sk,cs_bill_customer_sk]
+                                                    BroadcastHashJoin [ws_bill_customer_sk,c_customer_sk]
+                                                      Project [ws_bill_customer_sk,d_date]
+                                                        BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
+                                                          Filter [ws_sold_date_sk,ws_bill_customer_sk]
                                                             ColumnarToRow
                                                               InputAdapter
-                                                                Scan parquet default.catalog_sales [cs_sold_date_sk,cs_bill_customer_sk]
+                                                                Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
                                                           InputAdapter
                                                             ReusedExchange [d_date_sk,d_date] #3
                                                       InputAdapter
                                                         ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
-                                InputAdapter
-                                  BroadcastExchange #7
-                                    WholeStageCodegen (10)
-                                      HashAggregate [c_last_name,c_first_name,d_date]
-                                        InputAdapter
-                                          Exchange [c_last_name,c_first_name,d_date] #8
-                                            WholeStageCodegen (9)
-                                              HashAggregate [c_last_name,c_first_name,d_date]
-                                                Project [c_last_name,c_first_name,d_date]
-                                                  BroadcastHashJoin [ws_bill_customer_sk,c_customer_sk]
-                                                    Project [ws_bill_customer_sk,d_date]
-                                                      BroadcastHashJoin [ws_sold_date_sk,d_date_sk]
-                                                        Filter [ws_sold_date_sk,ws_bill_customer_sk]
-                                                          ColumnarToRow
-                                                            InputAdapter
-                                                              Scan parquet default.web_sales [ws_sold_date_sk,ws_bill_customer_sk]
-                                                        InputAdapter
-                                                          ReusedExchange [d_date_sk,d_date] #3
-                                                    InputAdapter
-                                                      ReusedExchange [c_customer_sk,c_first_name,c_last_name] #4
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/explain.txt
index 3efe02a..0c805be 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/explain.txt
@@ -1,32 +1,33 @@
 == Physical Plan ==
-TakeOrderedAndProject (28)
-+- * HashAggregate (27)
-   +- Exchange (26)
-      +- * HashAggregate (25)
-         +- * Expand (24)
-            +- BroadcastNestedLoopJoin Inner BuildRight (23)
-               :- * Project (19)
-               :  +- * SortMergeJoin Inner (18)
-               :     :- * Sort (12)
-               :     :  +- Exchange (11)
-               :     :     +- * Project (10)
-               :     :        +- * BroadcastHashJoin Inner BuildRight (9)
-               :     :           :- * Filter (3)
-               :     :           :  +- * ColumnarToRow (2)
-               :     :           :     +- Scan parquet default.inventory (1)
-               :     :           +- BroadcastExchange (8)
-               :     :              +- * Project (7)
-               :     :                 +- * Filter (6)
-               :     :                    +- * ColumnarToRow (5)
-               :     :                       +- Scan parquet default.date_dim (4)
-               :     +- * Sort (17)
-               :        +- Exchange (16)
-               :           +- * Filter (15)
-               :              +- * ColumnarToRow (14)
-               :                 +- Scan parquet default.item (13)
-               +- BroadcastExchange (22)
-                  +- * ColumnarToRow (21)
-                     +- Scan parquet default.warehouse (20)
+TakeOrderedAndProject (29)
++- * HashAggregate (28)
+   +- Exchange (27)
+      +- * HashAggregate (26)
+         +- * Expand (25)
+            +- * Project (24)
+               +- BroadcastNestedLoopJoin Inner BuildRight (23)
+                  :- * Project (19)
+                  :  +- * SortMergeJoin Inner (18)
+                  :     :- * Sort (12)
+                  :     :  +- Exchange (11)
+                  :     :     +- * Project (10)
+                  :     :        +- * BroadcastHashJoin Inner BuildRight (9)
+                  :     :           :- * Filter (3)
+                  :     :           :  +- * ColumnarToRow (2)
+                  :     :           :     +- Scan parquet default.inventory (1)
+                  :     :           +- BroadcastExchange (8)
+                  :     :              +- * Project (7)
+                  :     :                 +- * Filter (6)
+                  :     :                    +- * ColumnarToRow (5)
+                  :     :                       +- Scan parquet default.date_dim (4)
+                  :     +- * Sort (17)
+                  :        +- Exchange (16)
+                  :           +- * Filter (15)
+                  :              +- * ColumnarToRow (14)
+                  :                 +- Scan parquet default.item (13)
+                  +- BroadcastExchange (22)
+                     +- * ColumnarToRow (21)
+                        +- Scan parquet default.warehouse (20)
 
 
 (1) Scan parquet default.inventory
@@ -76,7 +77,7 @@ Input [4]: [inv_date_sk#1, inv_item_sk#2, inv_quantity_on_hand#3, d_date_sk#4]
 
 (11) Exchange
 Input [2]: [inv_item_sk#2, inv_quantity_on_hand#3]
-Arguments: hashpartitioning(inv_item_sk#2, 5), true, [id=#7]
+Arguments: hashpartitioning(inv_item_sk#2, 5), ENSURE_REQUIREMENTS, [id=#7]
 
 (12) Sort [codegen id : 3]
 Input [2]: [inv_item_sk#2, inv_quantity_on_hand#3]
@@ -98,7 +99,7 @@ Condition : isnotnull(i_item_sk#8)
 
 (16) Exchange
 Input [5]: [i_item_sk#8, i_brand#9, i_class#10, i_category#11, i_product_name#12]
-Arguments: hashpartitioning(i_item_sk#8, 5), true, [id=#13]
+Arguments: hashpartitioning(i_item_sk#8, 5), ENSURE_REQUIREMENTS, [id=#13]
 
 (17) Sort [codegen id : 5]
 Input [5]: [i_item_sk#8, i_brand#9, i_class#10, i_category#11, i_product_name#12]
@@ -129,29 +130,33 @@ Arguments: IdentityBroadcastMode, [id=#14]
 (23) BroadcastNestedLoopJoin
 Join condition: None
 
-(24) Expand [codegen id : 8]
+(24) Project [codegen id : 8]
+Output [5]: [inv_quantity_on_hand#3, i_product_name#12, i_brand#9, i_class#10, i_category#11]
 Input [5]: [inv_quantity_on_hand#3, i_brand#9, i_class#10, i_category#11, i_product_name#12]
+
+(25) Expand [codegen id : 8]
+Input [5]: [inv_quantity_on_hand#3, i_product_name#12, i_brand#9, i_class#10, i_category#11]
 Arguments: [List(inv_quantity_on_hand#3, i_product_name#12, i_brand#9, i_class#10, i_category#11, 0), List(inv_quantity_on_hand#3, i_product_name#12, i_brand#9, i_class#10, null, 1), List(inv_quantity_on_hand#3, i_product_name#12, i_brand#9, null, null, 3), List(inv_quantity_on_hand#3, i_product_name#12, null, null, null, 7), List(inv_quantity_on_hand#3, null, null, null, null, 15)], [inv_quantity_on_hand#3, i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19]
 
-(25) HashAggregate [codegen id : 8]
+(26) HashAggregate [codegen id : 8]
 Input [6]: [inv_quantity_on_hand#3, i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19]
 Keys [5]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19]
 Functions [1]: [partial_avg(cast(inv_quantity_on_hand#3 as bigint))]
 Aggregate Attributes [2]: [sum#20, count#21]
 Results [7]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19, sum#22, count#23]
 
-(26) Exchange
+(27) Exchange
 Input [7]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19, sum#22, count#23]
-Arguments: hashpartitioning(i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19, 5), true, [id=#24]
+Arguments: hashpartitioning(i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19, 5), ENSURE_REQUIREMENTS, [id=#24]
 
-(27) HashAggregate [codegen id : 9]
+(28) HashAggregate [codegen id : 9]
 Input [7]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19, sum#22, count#23]
 Keys [5]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, spark_grouping_id#19]
 Functions [1]: [avg(cast(inv_quantity_on_hand#3 as bigint))]
 Aggregate Attributes [1]: [avg(cast(inv_quantity_on_hand#3 as bigint))#25]
 Results [5]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, avg(cast(inv_quantity_on_hand#3 as bigint))#25 AS qoh#26]
 
-(28) TakeOrderedAndProject
+(29) TakeOrderedAndProject
 Input [5]: [i_product_name#15, i_brand#16, i_class#17, i_category#18, qoh#26]
 Arguments: 100, [qoh#26 ASC NULLS FIRST, i_product_name#15 ASC NULLS FIRST, i_brand#16 ASC NULLS FIRST, i_class#17 ASC NULLS FIRST, i_category#18 ASC NULLS FIRST], [i_product_name#15, i_brand#16, i_class#17, i_category#18, qoh#26]
 
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/simplified.txt
index d5f40d4..2d2e46d 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22.sf100/simplified.txt
@@ -6,43 +6,44 @@ TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
           WholeStageCodegen (8)
             HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count]
               Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
-                InputAdapter
-                  BroadcastNestedLoopJoin
-                    WholeStageCodegen (6)
-                      Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
-                        SortMergeJoin [inv_item_sk,i_item_sk]
-                          InputAdapter
-                            WholeStageCodegen (3)
-                              Sort [inv_item_sk]
-                                InputAdapter
-                                  Exchange [inv_item_sk] #2
-                                    WholeStageCodegen (2)
-                                      Project [inv_item_sk,inv_quantity_on_hand]
-                                        BroadcastHashJoin [inv_date_sk,d_date_sk]
-                                          Filter [inv_date_sk,inv_item_sk]
-                                            ColumnarToRow
-                                              InputAdapter
-                                                Scan parquet default.inventory [inv_date_sk,inv_item_sk,inv_quantity_on_hand]
-                                          InputAdapter
-                                            BroadcastExchange #3
-                                              WholeStageCodegen (1)
-                                                Project [d_date_sk]
-                                                  Filter [d_month_seq,d_date_sk]
-                                                    ColumnarToRow
-                                                      InputAdapter
-                                                        Scan parquet default.date_dim [d_date_sk,d_month_seq]
-                          InputAdapter
-                            WholeStageCodegen (5)
-                              Sort [i_item_sk]
-                                InputAdapter
-                                  Exchange [i_item_sk] #4
-                                    WholeStageCodegen (4)
-                                      Filter [i_item_sk]
-                                        ColumnarToRow
-                                          InputAdapter
-                                            Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
-                    BroadcastExchange #5
-                      WholeStageCodegen (7)
-                        ColumnarToRow
-                          InputAdapter
-                            Scan parquet default.warehouse
+                Project [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
+                  InputAdapter
+                    BroadcastNestedLoopJoin
+                      WholeStageCodegen (6)
+                        Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
+                          SortMergeJoin [inv_item_sk,i_item_sk]
+                            InputAdapter
+                              WholeStageCodegen (3)
+                                Sort [inv_item_sk]
+                                  InputAdapter
+                                    Exchange [inv_item_sk] #2
+                                      WholeStageCodegen (2)
+                                        Project [inv_item_sk,inv_quantity_on_hand]
+                                          BroadcastHashJoin [inv_date_sk,d_date_sk]
+                                            Filter [inv_date_sk,inv_item_sk]
+                                              ColumnarToRow
+                                                InputAdapter
+                                                  Scan parquet default.inventory [inv_date_sk,inv_item_sk,inv_quantity_on_hand]
+                                            InputAdapter
+                                              BroadcastExchange #3
+                                                WholeStageCodegen (1)
+                                                  Project [d_date_sk]
+                                                    Filter [d_month_seq,d_date_sk]
+                                                      ColumnarToRow
+                                                        InputAdapter
+                                                          Scan parquet default.date_dim [d_date_sk,d_month_seq]
+                            InputAdapter
+                              WholeStageCodegen (5)
+                                Sort [i_item_sk]
+                                  InputAdapter
+                                    Exchange [i_item_sk] #4
+                                      WholeStageCodegen (4)
+                                        Filter [i_item_sk]
+                                          ColumnarToRow
+                                            InputAdapter
+                                              Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
+                      BroadcastExchange #5
+                        WholeStageCodegen (7)
+                          ColumnarToRow
+                            InputAdapter
+                              Scan parquet default.warehouse
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/explain.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/explain.txt
index ad83ede..f2a95f7 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/explain.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/explain.txt
@@ -1,29 +1,30 @@
 == Physical Plan ==
-TakeOrderedAndProject (25)
-+- * HashAggregate (24)
-   +- Exchange (23)
-      +- * HashAggregate (22)
-         +- * Expand (21)
-            +- BroadcastNestedLoopJoin Inner BuildRight (20)
-               :- * Project (16)
-               :  +- * BroadcastHashJoin Inner BuildRight (15)
-               :     :- * Project (10)
-               :     :  +- * BroadcastHashJoin Inner BuildRight (9)
-               :     :     :- * Filter (3)
-               :     :     :  +- * ColumnarToRow (2)
-               :     :     :     +- Scan parquet default.inventory (1)
-               :     :     +- BroadcastExchange (8)
-               :     :        +- * Project (7)
-               :     :           +- * Filter (6)
-               :     :              +- * ColumnarToRow (5)
-               :     :                 +- Scan parquet default.date_dim (4)
-               :     +- BroadcastExchange (14)
-               :        +- * Filter (13)
-               :           +- * ColumnarToRow (12)
-               :              +- Scan parquet default.item (11)
-               +- BroadcastExchange (19)
-                  +- * ColumnarToRow (18)
-                     +- Scan parquet default.warehouse (17)
+TakeOrderedAndProject (26)
++- * HashAggregate (25)
+   +- Exchange (24)
+      +- * HashAggregate (23)
+         +- * Expand (22)
+            +- * Project (21)
+               +- BroadcastNestedLoopJoin Inner BuildRight (20)
+                  :- * Project (16)
+                  :  +- * BroadcastHashJoin Inner BuildRight (15)
+                  :     :- * Project (10)
+                  :     :  +- * BroadcastHashJoin Inner BuildRight (9)
+                  :     :     :- * Filter (3)
+                  :     :     :  +- * ColumnarToRow (2)
+                  :     :     :     +- Scan parquet default.inventory (1)
+                  :     :     +- BroadcastExchange (8)
+                  :     :        +- * Project (7)
+                  :     :           +- * Filter (6)
+                  :     :              +- * ColumnarToRow (5)
+                  :     :                 +- Scan parquet default.date_dim (4)
+                  :     +- BroadcastExchange (14)
+                  :        +- * Filter (13)
+                  :           +- * ColumnarToRow (12)
+                  :              +- Scan parquet default.item (11)
+                  +- BroadcastExchange (19)
+                     +- * ColumnarToRow (18)
+                        +- Scan parquet default.warehouse (17)
 
 
 (1) Scan parquet default.inventory
@@ -114,29 +115,33 @@ Arguments: IdentityBroadcastMode, [id=#13]
 (20) BroadcastNestedLoopJoin
 Join condition: None
 
-(21) Expand [codegen id : 5]
+(21) Project [codegen id : 5]
+Output [5]: [inv_quantity_on_hand#3, i_product_name#11, i_brand#8, i_class#9, i_category#10]
 Input [5]: [inv_quantity_on_hand#3, i_brand#8, i_class#9, i_category#10, i_product_name#11]
+
+(22) Expand [codegen id : 5]
+Input [5]: [inv_quantity_on_hand#3, i_product_name#11, i_brand#8, i_class#9, i_category#10]
 Arguments: [List(inv_quantity_on_hand#3, i_product_name#11, i_brand#8, i_class#9, i_category#10, 0), List(inv_quantity_on_hand#3, i_product_name#11, i_brand#8, i_class#9, null, 1), List(inv_quantity_on_hand#3, i_product_name#11, i_brand#8, null, null, 3), List(inv_quantity_on_hand#3, i_product_name#11, null, null, null, 7), List(inv_quantity_on_hand#3, null, null, null, null, 15)], [inv_quantity_on_hand#3, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
 
-(22) HashAggregate [codegen id : 5]
+(23) HashAggregate [codegen id : 5]
 Input [6]: [inv_quantity_on_hand#3, i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
 Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
 Functions [1]: [partial_avg(cast(inv_quantity_on_hand#3 as bigint))]
 Aggregate Attributes [2]: [sum#19, count#20]
 Results [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
 
-(23) Exchange
+(24) Exchange
 Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
-Arguments: hashpartitioning(i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, 5), true, [id=#23]
+Arguments: hashpartitioning(i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, 5), ENSURE_REQUIREMENTS, [id=#23]
 
-(24) HashAggregate [codegen id : 6]
+(25) HashAggregate [codegen id : 6]
 Input [7]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18, sum#21, count#22]
 Keys [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, spark_grouping_id#18]
 Functions [1]: [avg(cast(inv_quantity_on_hand#3 as bigint))]
 Aggregate Attributes [1]: [avg(cast(inv_quantity_on_hand#3 as bigint))#24]
 Results [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, avg(cast(inv_quantity_on_hand#3 as bigint))#24 AS qoh#25]
 
-(25) TakeOrderedAndProject
+(26) TakeOrderedAndProject
 Input [5]: [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
 Arguments: 100, [qoh#25 ASC NULLS FIRST, i_product_name#14 ASC NULLS FIRST, i_brand#15 ASC NULLS FIRST, i_class#16 ASC NULLS FIRST, i_category#17 ASC NULLS FIRST], [i_product_name#14, i_brand#15, i_class#16, i_category#17, qoh#25]
 
diff --git a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/simplified.txt b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/simplified.txt
index cdf9335..559e963 100644
--- a/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/simplified.txt
+++ b/sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q22/simplified.txt
@@ -6,34 +6,35 @@ TakeOrderedAndProject [qoh,i_product_name,i_brand,i_class,i_category]
           WholeStageCodegen (5)
             HashAggregate [i_product_name,i_brand,i_class,i_category,spark_grouping_id,inv_quantity_on_hand] [sum,count,sum,count]
               Expand [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
-                InputAdapter
-                  BroadcastNestedLoopJoin
-                    WholeStageCodegen (3)
-                      Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
-                        BroadcastHashJoin [inv_item_sk,i_item_sk]
-                          Project [inv_item_sk,inv_quantity_on_hand]
-                            BroadcastHashJoin [inv_date_sk,d_date_sk]
-                              Filter [inv_date_sk,inv_item_sk]
-                                ColumnarToRow
-                                  InputAdapter
-                                    Scan parquet default.inventory [inv_date_sk,inv_item_sk,inv_quantity_on_hand]
-                              InputAdapter
-                                BroadcastExchange #2
-                                  WholeStageCodegen (1)
-                                    Project [d_date_sk]
-                                      Filter [d_month_seq,d_date_sk]
-                                        ColumnarToRow
-                                          InputAdapter
-                                            Scan parquet default.date_dim [d_date_sk,d_month_seq]
-                          InputAdapter
-                            BroadcastExchange #3
-                              WholeStageCodegen (2)
-                                Filter [i_item_sk]
+                Project [inv_quantity_on_hand,i_product_name,i_brand,i_class,i_category]
+                  InputAdapter
+                    BroadcastNestedLoopJoin
+                      WholeStageCodegen (3)
+                        Project [inv_quantity_on_hand,i_brand,i_class,i_category,i_product_name]
+                          BroadcastHashJoin [inv_item_sk,i_item_sk]
+                            Project [inv_item_sk,inv_quantity_on_hand]
+                              BroadcastHashJoin [inv_date_sk,d_date_sk]
+                                Filter [inv_date_sk,inv_item_sk]
                                   ColumnarToRow
                                     InputAdapter
-                                      Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
-                    BroadcastExchange #4
-                      WholeStageCodegen (4)
-                        ColumnarToRow
-                          InputAdapter
-                            Scan parquet default.warehouse
+                                      Scan parquet default.inventory [inv_date_sk,inv_item_sk,inv_quantity_on_hand]
+                                InputAdapter
+                                  BroadcastExchange #2
+                                    WholeStageCodegen (1)
+                                      Project [d_date_sk]
+                                        Filter [d_month_seq,d_date_sk]
+                                          ColumnarToRow
+                                            InputAdapter
+                                              Scan parquet default.date_dim [d_date_sk,d_month_seq]
+                            InputAdapter
+                              BroadcastExchange #3
+                                WholeStageCodegen (2)
+                                  Filter [i_item_sk]
+                                    ColumnarToRow
+                                      InputAdapter
+                                        Scan parquet default.item [i_item_sk,i_brand,i_class,i_category,i_product_name]
+                      BroadcastExchange #4
+                        WholeStageCodegen (4)
+                          ColumnarToRow
+                            InputAdapter
+                              Scan parquet default.warehouse
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
index 5bcec9b..dbdb066 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/LogicalPlanTagInSparkPlanSuite.scala
@@ -131,7 +131,7 @@ class LogicalPlanTagInSparkPlanSuite extends TPCDSQuerySuite with DisableAdaptiv
   }
 
   private def getLogicalPlan(node: SparkPlan): LogicalPlan = {
-    node.getTagValue(SparkPlan.LOGICAL_PLAN_TAG).getOrElse {
+    node.logicalLink.getOrElse {
       fail(node.getClass.getSimpleName + " does not have a logical plan link")
     }
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
index ab8cd96..7da66e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/RemoveRedundantProjectsSuite.scala
@@ -213,8 +213,23 @@ abstract class RemoveRedundantProjectsSuiteBase
         |ORDER BY t1.key, t2.key, s1, s2
         |LIMIT 10
         |""".stripMargin
-    assertProjectExec(query, 0, 3)
+    // The Project above the Expand is not removed due to SPARK-36020.
+    assertProjectExec(query, 1, 3)
+  }
 
+  test("SPARK-36020: Project should not be removed when child's logical link is different") {
+    val query =
+      """
+        |WITH t AS (
+        | SELECT key, a, b, c, explode(d) AS d FROM testView
+        |)
+        |SELECT t1.key, t1.d, t2.key
+        |FROM (SELECT d, key FROM t) t1
+        |JOIN testView t2 ON t1.key = t2.key
+        |""".stripMargin
+    // The ProjectExec above the GenerateExec should not be removed because
+    // they have different logical links.
+    assertProjectExec(query, enabled = 2, disabled = 3)
   }
 
   Seq("true", "false").foreach { codegenEnabled =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index c8c4f97..fcc27de 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -1481,4 +1481,25 @@ class AdaptiveQueryExecSuite
       }
     }
   }
+
+  test("SPARK-36020: Check logical link in remove redundant projects") {
+    withTempView("t") {
+      spark.range(10).selectExpr("id % 10 as key", "cast(id * 2 as int) as a",
+        "cast(id * 3 as int) as b", "array(id, id + 1, id + 3) as c").createOrReplaceTempView("t")
+      withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
+        val query =
+          """
+            |WITH tt AS (
+            | SELECT key, a, b, explode(c) AS c FROM t
+            |)
+            |SELECT t1.key, t1.c, t2.key, t2.c
+            |FROM (SELECT a, b, c, key FROM tt WHERE a > 1) t1
+            |JOIN (SELECT a, b, c, key FROM tt) t2
+            |  ON t1.key = t2.key
+            |""".stripMargin
+        // here we only need to make sure this query can run
+        runAdaptiveAndVerifyResult(query)
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org