You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/03/22 13:53:48 UTC

[GitHub] [spark] cloud-fan commented on a diff in pull request #40522: [SPARK-42101][SQL][FOLLOWUP] Make QueryStageExec.resultOption and isMeterialized consistent

cloud-fan commented on code in PR #40522:
URL: https://github.com/apache/spark/pull/40522#discussion_r1144851788


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala:
##########
@@ -561,34 +562,30 @@ case class AdaptiveSparkPlanExec(
   }
 
   private def newQueryStage(plan: SparkPlan): QueryStageExec = {
-    val optimizedPlan = plan match {
-      case e: Exchange =>
-        e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
-      case _ => plan
-    }
-    val newPlan = applyPhysicalRules(
-      optimizedPlan,
-      postStageCreationRules(outputsColumnar = plan.supportsColumnar),
-      Some((planChangeLogger, "AQE Post Stage Creation")))
     val queryStage = plan match {
-      case s: ShuffleExchangeLike =>
-        if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
-          throw SparkException.internalError(
-            "Custom columnar rules cannot transform shuffle node to something else.")
-        }
-        ShuffleQueryStageExec(currentStageId, newPlan, s.canonicalized)
-      case b: BroadcastExchangeLike =>
-        if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
-          throw SparkException.internalError(
-            "Custom columnar rules cannot transform broadcast node to something else.")
+      case e: Exchange =>
+        val optimized = e.withNewChildren(Seq(optimizeQueryStage(e.child, isFinalStage = false)))
+        val newPlan = applyPhysicalRules(
+          optimized,
+          postStageCreationRules(outputsColumnar = plan.supportsColumnar),
+          Some((planChangeLogger, "AQE Post Stage Creation")))
+        if (e.isInstanceOf[ShuffleExchangeLike]) {
+          if (!newPlan.isInstanceOf[ShuffleExchangeLike]) {
+            throw SparkException.internalError(
+              "Custom columnar rules cannot transform shuffle node to something else.")
+          }
+          ShuffleQueryStageExec(currentStageId, newPlan, e.canonicalized)
+        } else {
+          assert(e.isInstanceOf[BroadcastExchangeLike])
+          if (!newPlan.isInstanceOf[BroadcastExchangeLike]) {
+            throw SparkException.internalError(
+              "Custom columnar rules cannot transform broadcast node to something else.")
+          }
+          BroadcastQueryStageExec(currentStageId, newPlan, e.canonicalized)
         }
-        BroadcastQueryStageExec(currentStageId, newPlan, b.canonicalized)
       case i: InMemoryTableScanExec =>
-        if (!newPlan.isInstanceOf[InMemoryTableScanExec]) {

Review Comment:
   a small cleanup. We don't need to run any rules for `InMemoryTableScanExec` as it's a leaf node and supports both columnar and row-based output.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org