You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/03/15 05:12:10 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #40385: [SPARK-42753] ReusedExchange refers to non-existent nodes

cloud-fan commented on code in PR #40385:
URL: https://github.com/apache/spark/pull/40385#discussion_r1136540926


##########
sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala:
##########
@@ -771,6 +775,130 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
       FormattedMode,
       statistics)
   }
+
+  test("SPARK-42753: Process subtree for ReusedExchange with unknown child") {
+    // Simulate a simplified subtree with a ReusedExchange pointing to an Exchange node that has
+    // no ID. This is a rare edge cases that could arise during AQE if there are multiple
+    // ReusedExchanges. We check to make sure the child Exchange gets an ID and gets printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused = ReusedExchangeExec(Seq.empty, exchange)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](reused, appendStr(_))
+
+    val expectedTree = """|ReusedExchange (1)
+                          |+- Exchange (3)
+                          |   +- Range (2)""".stripMargin
+
+    assert(results.contains(expectedTree))
+    assert(results.contains("(1) ReusedExchange [Reuses operator id: 3]"))
+  }
+
+  test("SPARK-42753: Two ReusedExchange Sharing Same Subtree") {
+    // Simulate a simplified subtree with a two ReusedExchange reusing the same exchange
+    // Only one exchange node should be printed
+    val exchange = ShuffleExchangeExec(UnknownPartitioning(10),
+      RangeExec(org.apache.spark.sql.catalyst.plans.logical.Range(0, 1000, 1, 10)))
+    val reused1 = ReusedExchangeExec(Seq.empty, exchange)
+    val reused2 = ReusedExchangeExec(Seq.empty, exchange)
+    val join = SortMergeJoinExec(Seq.empty, Seq.empty, Inner, None, reused1, reused2)
+
+    var results = ""
+    def appendStr(str: String): Unit = {
+      results = results + str
+    }
+
+    ExplainUtils.processPlan[SparkPlan](join, appendStr(_))
+
+    val expectedTree = """|SortMergeJoin Inner (3)
+                          |:- ReusedExchange (1)
+                          |:  +- Exchange (5)
+                          |:     +- Range (4)
+                          |+- ReusedExchange (2)

Review Comment:
   I find it confusing to have only one `ReusedExchange` displaying the referenced `Exchange` and others don't. Can you reconsider my proposal of adding a new `Adaptively Optimized Out Exchanges` section?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org