You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Filipe Oliveira (Jira)" <ji...@apache.org> on 2022/10/26 16:00:01 UTC

[jira] [Created] (SPARK-40923) QueryStageExec canonical plan won't support columnar

Filipe Oliveira created SPARK-40923:
---------------------------------------

             Summary: QueryStageExec canonical plan won't support columnar
                 Key: SPARK-40923
                 URL: https://issues.apache.org/jira/browse/SPARK-40923
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.2.0
            Reporter: Filipe Oliveira


https://issues.apache.org/jira/browse/SPARK-34168 changed QueryStageExec canonicalization to return the plan before applying optimizations rules. This means that a plan may supportColumnar but it's canonicalization not (when a plugin transforms an exchange).



This is an issue when a later adaptive stage gets the canonicalized plan and asserts {{supportsColumnar}} as done by {{ColumnarToRowExec.}}

 
{code:java}
assert(Utils.isInRunningSparkTask || child.supportsColumnar){code}
 

I believe it can be fixed by adding a {{supportsColumnar}} field to {{BroadcastQueryStageExec}} and {{ShuffleQueryStageExec}} and take the value from the plan after optimizations.

 

The scenario below is from Q2 of TPC-H and using Spark 3.2.1

 
{code:java}
BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[3, double, true])), input[0, bigint, true]),false), [id=#2226]
+- *(1) ColumnarToRow
   +- AQEShuffleRead local
      +- ShuffleQueryStage 8
         +- ColumnarExchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(ps_supplycost#3)), p_partkey#15L, 200), ENSURE_REQUIREMENTS, [id=#1953] {code}
 
{code:java}
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:69)
    at org.apache.spark.sql.execution.ColumnarToRowExec.copy(Columnar.scala:67)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1136)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1134)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildrenInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:358)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:517)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:499)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:499)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:95)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:78)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:493){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org