You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Filipe Oliveira (Jira)" <ji...@apache.org> on 2022/10/26 16:03:00 UTC

[jira] [Updated] (SPARK-40923) QueryStageExec canonical plan won't support columnar

     [ https://issues.apache.org/jira/browse/SPARK-40923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Filipe Oliveira updated SPARK-40923:
------------------------------------
    Description: 
https://issues.apache.org/jira/browse/SPARK-34168 changed QueryStageExec canonicalization to return the plan before applying optimizations rules. This means that a plan may supportColumnar but it's canonicalization not (when a plugin transforms an exchange).

This is an issue when a later adaptive stage gets the canonicalized plan and asserts {{supportsColumnar}} as done by {{ColumnarToRowExec.}}
{code:java}
assert(Utils.isInRunningSparkTask || child.supportsColumnar){code}
 

I believe it can be fixed by adding a {{supportsColumnar}} field to {{BroadcastQueryStageExec}} and {{ShuffleQueryStageExec}} and take the value from the plan after optimizations.

 

The scenario below is from Q2 of TPC-H and using Spark 3.2.1
{code:java}
BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[3, double, true])), input[0, bigint, true]),false), [id=#2226]
+- *(1) ColumnarToRow
   +- AQEShuffleRead local
      +- ShuffleQueryStage 8
         +- ColumnarExchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(ps_supplycost#3)), p_partkey#15L, 200), ENSURE_REQUIREMENTS, [id=#1953] {code}
 
{code:java}
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:69)
    at org.apache.spark.sql.execution.ColumnarToRowExec.copy(Columnar.scala:67)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1136)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1134)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildrenInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:358)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:517)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:499)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:499)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:95)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:78)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:493){code}

  was:
https://issues.apache.org/jira/browse/SPARK-34168 changed QueryStageExec canonicalization to return the plan before applying optimizations rules. This means that a plan may supportColumnar but it's canonicalization not (when a plugin transforms an exchange).



This is an issue when a later adaptive stage gets the canonicalized plan and asserts {{supportsColumnar}} as done by {{ColumnarToRowExec.}}

 
{code:java}
assert(Utils.isInRunningSparkTask || child.supportsColumnar){code}
 

I believe it can be fixed by adding a {{supportsColumnar}} field to {{BroadcastQueryStageExec}} and {{ShuffleQueryStageExec}} and take the value from the plan after optimizations.

 

The scenario below is from Q2 of TPC-H and using Spark 3.2.1

 
{code:java}
BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[3, double, true])), input[0, bigint, true]),false), [id=#2226]
+- *(1) ColumnarToRow
   +- AQEShuffleRead local
      +- ShuffleQueryStage 8
         +- ColumnarExchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(ps_supplycost#3)), p_partkey#15L, 200), ENSURE_REQUIREMENTS, [id=#1953] {code}
 
{code:java}
    at scala.Predef$.assert(Predef.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:69)
    at org.apache.spark.sql.execution.ColumnarToRowExec.copy(Columnar.scala:67)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:208)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1136)
    at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1134)
    at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildrenInternal(Columnar.scala:67)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:359)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:358)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:517)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:499)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at scala.collection.IterableLike.foreach(IterableLike.scala:74)
    at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
    at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
    at scala.collection.TraversableLike.map(TraversableLike.scala:286)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
    at scala.collection.AbstractTraversable.map(Traversable.scala:108)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:499)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:95)
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:78)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:493){code}


> QueryStageExec canonical plan won't support columnar
> ----------------------------------------------------
>
>                 Key: SPARK-40923
>                 URL: https://issues.apache.org/jira/browse/SPARK-40923
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.2.0
>            Reporter: Filipe Oliveira
>            Priority: Minor
>
> https://issues.apache.org/jira/browse/SPARK-34168 changed QueryStageExec canonicalization to return the plan before applying optimizations rules. This means that a plan may supportColumnar but it's canonicalization not (when a plugin transforms an exchange).
> This is an issue when a later adaptive stage gets the canonicalized plan and asserts {{supportsColumnar}} as done by {{ColumnarToRowExec.}}
> {code:java}
> assert(Utils.isInRunningSparkTask || child.supportsColumnar){code}
>  
> I believe it can be fixed by adding a {{supportsColumnar}} field to {{BroadcastQueryStageExec}} and {{ShuffleQueryStageExec}} and take the value from the plan after optimizations.
>  
> The scenario below is from Q2 of TPC-H and using Spark 3.2.1
> {code:java}
> BroadcastExchange HashedRelationBroadcastMode(List(knownfloatingpointnormalized(normalizenanandzero(input[3, double, true])), input[0, bigint, true]),false), [id=#2226]
> +- *(1) ColumnarToRow
>    +- AQEShuffleRead local
>       +- ShuffleQueryStage 8
>          +- ColumnarExchange hashpartitioning(knownfloatingpointnormalized(normalizenanandzero(ps_supplycost#3)), p_partkey#15L, 200), ENSURE_REQUIREMENTS, [id=#1953] {code}
>  
> {code:java}
>     at scala.Predef$.assert(Predef.scala:208)
>     at org.apache.spark.sql.execution.ColumnarToRowExec.<init>(Columnar.scala:69)
>     at org.apache.spark.sql.execution.ColumnarToRowExec.copy(Columnar.scala:67)
>     at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:208)
>     at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildInternal(Columnar.scala:67)
>     at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal(TreeNode.scala:1136)
>     at org.apache.spark.sql.catalyst.trees.UnaryLike.withNewChildrenInternal$(TreeNode.scala:1134)
>     at org.apache.spark.sql.execution.ColumnarToRowExec.withNewChildrenInternal(Columnar.scala:67)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$withNewChildren$2(TreeNode.scala:359)
>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.withNewChildren(TreeNode.scala:358)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:517)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$doCanonicalize$1(QueryPlan.scala:499)
>     at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
>     at scala.collection.Iterator.foreach(Iterator.scala:943)
>     at scala.collection.Iterator.foreach$(Iterator.scala:943)
>     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
>     at scala.collection.IterableLike.foreach(IterableLike.scala:74)
>     at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
>     at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
>     at scala.collection.TraversableLike.map(TraversableLike.scala:286)
>     at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
>     at scala.collection.AbstractTraversable.map(Traversable.scala:108)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.doCanonicalize(QueryPlan.scala:499)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
>     at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:95)
>     at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doCanonicalize(BroadcastExchangeExec.scala:78)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:485)
>     at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:484)
>     at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.createQueryStages(AdaptiveSparkPlanExec.scala:493){code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org