You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "StevenChenDatabricks (via GitHub)" <gi...@apache.org> on 2023/03/12 19:27:00 UTC

[GitHub] [spark] StevenChenDatabricks opened a new pull request, #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

StevenChenDatabricks opened a new pull request, #40385:
URL: https://github.com/apache/spark/pull/40385

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1141961253


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    */
   def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
     try {
+      // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
+      // intentional overwriting of IDs generated in previous AQE iteration
+      val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap())

Review Comment:
   do we really need a `IdentityHashMap`? `SparkPlan` has a unique `id`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1468621742

   @cloud-fan Thanks for the idea and response!
   
   1. I don't think this issue doesn't affects `ReusedSubquery` because of how its processed and printed. The current algorithm finds all Subquery nodes (including `ReusedSubquery`) and for each Subquery, it traverses the Subquery subtree to generate the IDs if they are missing. 
   Furthermore, a `ReusedSubquery` does not print the details of the Subquery it reuses whereas for ReusedExchange it does print the Exchange ID being reused. For a `ReusedSubquery`, all that is printed is this line:
   ```Subquery:5 Hosting operator id = 50 Hosting Expression = ReusedSubquery Subquery scalar-subquery#31, [id=#32]```
   `Hosting operator ID` is the parent operator that contains the `ReusedSubquery`. The subtree of the `ReusedSubquery` is not printed anywhere and the `ReusedSubquery` node itself is not printed in the main plan tree either. Even if there are non-existing children, the issue is not surfaced in the Explain plan by default.
   I guess there's still a chance it might affect Spark UI whereby the IDs in the subtree of a `ReusedSubquery` are incorrect because the IDs were generated in a previous AQE iteration... I'm not sure. I think it's best to wait and see if a ticket/bug like this is ever reported. 
   
   2. My fix detects all the ReusedExchanges with non-existing children and generate IDs on them. I guess your question is what if multiple `ReusedExchange` reference the same non-existing `Exchange`? That's a good point and I need to account for that edge case in the code in case that is possible.
   
   With regards to your idea for a section of non-existing `Exchanges`: we already only print each operator exactly once in the node details section. As shown in the PR description: I currently print out the plan subtree of the Non-Existing Exchange below the `ReusedExchange` (since that subtree is not shown anywhere else) and the node details while maintaining uniqueness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136540926


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   I find it confusing to have only one `ReusedExchange` displaying the referenced `Exchange` and others don't. Can you reconsider my proposal of adding a new `Adaptively Optimized Out Exchanges` section?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1467287615

   @cloud-fan Yes this is purely UI and EXPLAIN issue. It does not affect query execution. 
   
   I'm not sure how AQE context stageCache map would help. The issue in EXPLAIN is that the ReusedExchange.child references a Exchange node that is not referenced anywhere else in the plan tree so we need to generate IDs on the subtree rooted at ReusedExchange.child and print them out. To do this, we need a way to check whether the ReusedExchange.child is referenced anywhere else - if they are not referenced anywhere else, we need to recursively generate IDs for subtree. I keep a HashSet of nodes with IDs already generated and check ReusedExchange.child against it to see if we need to recursively generate IDs on the subtree. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1467245909

   Yea AQE may remove materialized query stages due to optimizations like empty relation propagation, but I think it's fine as the shuffle files are still there (we don't unregister the shuffle), so the reused shuffle operator can still read these shuffle files using the shuffle id. The problem with EXPLAIN is it only looks for the referenced exchange in the query plan tree, I think we can also look up from the AQE stage cache map?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1143134007


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,163 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple

Review Comment:
   ```suggestion
       // no ID. This is a rare edge case that could arise during AQE if there are multiple
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1142444491


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -119,17 +155,40 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * @param plan Input query plan to process
    * @param startOperatorID The start value of operation id. The subsequent operations will be
    *                        assigned higher value.
+   * @param visited A unique set of operators visited by generateOperatorIds. The set is scoped
+   *                at the callsite function processPlan. It serves two purpose: Firstly, it is
+   *                used to avoid accidentally overwriting existing IDs that were generated in
+   *                the same processPlan call. Secondly, it is used to allow for intentional ID
+   *                overwriting as part of SPARK-42753 where an Adaptively Optimized Out Exchange
+   *                and its subtree may contain IDs that were generated in a previous AQE
+   *                iteration's processPlan call which would result in incorrect IDs.
+   * @param reusedExchanges A unique set of ReusedExchange nodes visited which will be used to
+   *                        idenitfy adaptively optimized out exchanges in SPARK-42753.
+   * @param addReusedExchanges Whether to add ReusedExchange nodes to reusedExchanges set. We set it
+   *                           to false to avoid processing more nested ReusedExchanges nodes in the
+   *                           subtree of an Adpatively Optimized Out Exchange.
    * @return The last generated operation id for this input plan. This is to ensure we always
    *         assign incrementing unique id to each operator.
    */
-  private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = {
+  private def generateOperatorIDs(
+      plan: QueryPlan[_],
+      startOperatorID: Int,
+      visited: Set[QueryPlan[_]],
+      reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+      addReusedExchanges: Boolean): Int = {
     var currentOperationID = startOperatorID
     // Skip the subqueries as they are not printed as part of main query block.
     if (plan.isInstanceOf[BaseSubqueryExec]) {
       return currentOperationID
     }
 
-    def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {

Review Comment:
   I'm removing the check "if OP_ID_TAG is empty" because we are allowing overwriting the OP_ID_TAG since in some "Optimized Out Exchange" it may contain nodes that have "OP_ID_TAG" generated from previous `processPlan` call in a previous AQE iteration. Therefore the "OP_ID_TAG" would be incorrect and needs overwriting.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1469225186

   So we randomly pick one `ReusedExchange` to print its corresponding `Exchange`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136663376


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   Thanks Wenchen. I think maybe I don't understand your "Section" idea. Do we print out the subtree structure in the "Section" too? Like for example:
   ```
   ===== Adaptively Optimized Out Exchanges ========
   +- Exchange (5)
       +- Range (4)
   
   (5) Exchange
   Input [1]: [id#0L]
   Arguments: UnknownPartitioning(10), 
   
   (4) Range
   Output [1]: [id#0L]
   Arguments: Range (0, 1000, step=1, splits=Some(10))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1468104875

   @StevenChenDatabricks Thanks for the explanation, now I understand it.
   
   I still have some high-level questions:
   1. Does `ReusedSubquery` have the same issue?
   2. what if there are more than one `ReusedExchange`s that references non-existing exchange?
   
   I have a new idea that we still display `ReusedExchange` as it is, but we add a new section to display non-existing `Exchanges`:
   ```
   ==== Adaptively Optimized Out Exchanges ====
   (132) Exchange
   Input [3]: [sr_customer_sk#217, sr_store_sk#221, sum#327L]
   Arguments: hashpartitioning(sr_store_sk#221, 200), ENSURE_REQUIREMENTS, [plan_id=1791]
   ```
   
   Then there is no duplication even if more than one `ReusedExchange` refernece it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136542981


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   We can also get feedback from more people as this is not a tech problem but more about user experience. cc @maryannxue @viirya @gengliangwang 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136498864


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {

Review Comment:
   @cloud-fan yes we only print one copy of the `Exchange` if its shared between multiple `ReusedExchange`. The order is sort of "random" because it prints the first time the node is traversed.
   
   I added a unit test here to demo this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136498864


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {

Review Comment:
   @cloud-fan yes we only print one copy of the `Exchange` if its shared between multiple `ReusedExchange`. The order is sort of "random" because it prints the first time the `Exchange` is traversed.
   
   I added a unit test here to demo this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1140560154


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -119,17 +143,35 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * @param plan Input query plan to process
    * @param startOperatorID The start value of operation id. The subsequent operations will be
    *                        assigned higher value.
+   * @param overwrite Whether to overwrite existing IDs if they already exist. This is needed

Review Comment:
   TODO: update comments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1469073120

   > we already only print each operator exactly once in the node details section
   
   What if more than one `ReusedExchange` referencing the same `Exchange`? Will we print the `Exchange` twice?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1141538044


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   @cloud-fan I've updated implementation to output the "Adaptively Optimized Out Exchanges" section as you suggested :) Please let me know thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1141963547


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -119,17 +155,40 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    * @param plan Input query plan to process
    * @param startOperatorID The start value of operation id. The subsequent operations will be
    *                        assigned higher value.
+   * @param visited A unique set of operators visited by generateOperatorIds. The set is scoped
+   *                at the callsite function processPlan. It serves two purpose: Firstly, it is
+   *                used to avoid accidentally overwriting existing IDs that were generated in
+   *                the same processPlan call. Secondly, it is used to allow for intentional ID
+   *                overwriting as part of SPARK-42753 where an Adaptively Optimized Out Exchange
+   *                and its subtree may contain IDs that were generated in a previous AQE
+   *                iteration's processPlan call which would result in incorrect IDs.
+   * @param reusedExchanges A unique set of ReusedExchange nodes visited which will be used to
+   *                        idenitfy adaptively optimized out exchanges in SPARK-42753.
+   * @param addReusedExchanges Whether to add ReusedExchange nodes to reusedExchanges set. We set it
+   *                           to false to avoid processing more nested ReusedExchanges nodes in the
+   *                           subtree of an Adpatively Optimized Out Exchange.
    * @return The last generated operation id for this input plan. This is to ensure we always
    *         assign incrementing unique id to each operator.
    */
-  private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = {
+  private def generateOperatorIDs(
+      plan: QueryPlan[_],
+      startOperatorID: Int,
+      visited: Set[QueryPlan[_]],
+      reusedExchanges: ArrayBuffer[ReusedExchangeExec],
+      addReusedExchanges: Boolean): Int = {
     var currentOperationID = startOperatorID
     // Skip the subqueries as they are not printed as part of main query block.
     if (plan.isInstanceOf[BaseSubqueryExec]) {
       return currentOperationID
     }
 
-    def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {

Review Comment:
   we don't need `QueryPlan.OP_ID_TAG` anymore?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1478591539

   @cloud-fan I've addressed your comments. Thanks for the review! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes
URL: https://github.com/apache/spark/pull/40385


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1143136328


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,163 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)

Review Comment:
   let's create valid plans for good hygiene. The shuffle partitioning can't be `UnknownPartitioning`, we can use `RoundRobinPartitioning`. The `ReusedExchangeExec` should have outout, we can use `exchange.output.map(_.newInstance)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1478798727

   thanks, merging to master/3.4!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136672487


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   @cloud-fan 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1141961865


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    */
   def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
     try {
+      // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
+      // intentional overwriting of IDs generated in previous AQE iteration
+      val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap())
+      // Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
+      // Exchanges as part of SPARK-42753
+      val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]

Review Comment:
   This is an `ArrayBuffer`, how do we make sure it's unique? Or we don't need to?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on PR #40385:
URL: https://github.com/apache/spark/pull/40385#issuecomment-1469194144

   @cloud-fan It wouldn't because `collectOperatorsWithID` in ExplainUtils is responsible for collecting the list of nodes to print out. It uses a BitSet `collectedOperators` that's globally shared to ensure each node is "collected" and printed exactly once.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1143136328


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,163 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)

Review Comment:
   let's create valid plans for good hygiene. The shuffle partitioning can't be `UnknownPartitioning`, we can use `RoundRobinPartitioning`. The `ReusedExchangeExec` should have outout, we can use `exchange.output`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1142811318


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    */
   def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
     try {
+      // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
+      // intentional overwriting of IDs generated in previous AQE iteration
+      val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap())
+      // Initialize an array of ReusedExchanges to help find Adaptively Optimized Out
+      // Exchanges as part of SPARK-42753
+      val reusedExchanges = ArrayBuffer.empty[ReusedExchangeExec]

Review Comment:
   This `ArrayBuffer` is guaranteed unique because we only insert `ReusedExchange` nodes in the `setOpId` function which checks against the `IdentityHashMap`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] StevenChenDatabricks commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "StevenChenDatabricks (via GitHub)" <gi...@apache.org>.
StevenChenDatabricks commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1142810230


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala:
##########
@@ -73,14 +78,34 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
    */
   def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
     try {
+      // Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
+      // intentional overwriting of IDs generated in previous AQE iteration
+      val operators = newSetFromMap[QueryPlan[_]](new IdentityHashMap())

Review Comment:
   `SparkPlan` has ID but not all `QueryPlan` have ID and this function allows all `QueryPlan`. I attempted creating HashMap of IDs and casting the `plan` argument as a `SparkPlan` but it would fail some tests cases where the node isn't a `SparkPlan`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136812183


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   Yea that is my idea. It's similar to the subquery section.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org