You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (Jira)" <ji...@apache.org> on 2021/12/09 01:59:00 UTC

[jira] [Commented] (SPARK-37579) Called spark.sql multiple times,union multiple DataFrame, groupBy and pivot, join other table view cause exception

    [ https://issues.apache.org/jira/browse/SPARK-37579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17456092#comment-17456092 ] 

Hyukjin Kwon commented on SPARK-37579:
--------------------------------------

Spark 2.4 is EOL. mind checking if that passes with Spark 3+?

> Called spark.sql multiple times,union multiple DataFrame, groupBy and pivot, join other table view cause exception
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-37579
>                 URL: https://issues.apache.org/jira/browse/SPARK-37579
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.4.7
>            Reporter: page
>            Priority: Major
>
> Possible steps to reproduce:
> 1. Run spark.sql multiple times, get DataFrame list [d1, d2, d3, d4]
> 2. Combine DataFrame list [d1, d2, d3, d4] to a DataFrame d5 by calling Dataset#unionByName
> 3. Run 
> {code:java}
> d5.groupBy("c1").pivot("c2").agg(concat_ws(", ", collect_list("value"))){code}
> ,produce DataFrame d6
> 4. DataFrame d6 join another DataFrame d7
> 5. Call function like count to trigger spark job
> 6. Exception happend
>  
> stack trace:
> org.apache.spark.SparkException: Exception thrown in awaitResult:
> at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:226)
> at org.apache.spark.sql.execution.adaptive.QueryStage.executeChildStages(QueryStage.scala:88)
> at org.apache.spark.sql.execution.adaptive.QueryStage.prepareExecuteStage(QueryStage.scala:136)
> at org.apache.spark.sql.execution.adaptive.QueryStage.executeCollect(QueryStage.scala:242)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2837)
> at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2836)
> at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3441)
> at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:92)
> at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:139)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:87)
> at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withAction(Dataset.scala:3440)
> at org.apache.spark.sql.Dataset.count(Dataset.scala:2836)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(2, 1)
> at org.apache.spark.rdd.ZippedPartitionsBaseRDD.getPartitions(ZippedPartitionsRDD.scala:57)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
> at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
> at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
> at org.apache.spark.ShuffleDependency.<init>(Dependency.scala:94)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec$.prepareShuffleDependency(ShuffleExchangeExec.scala:361)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:69)
> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.eagerExecute(ShuffleExchangeExec.scala:112)
> at org.apache.spark.sql.execution.adaptive.ShuffleQueryStage.executeStage(QueryStage.scala:284)
> at org.apache.spark.sql.execution.adaptive.QueryStage.doExecute(QueryStage.scala:236)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:137)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:161)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:158)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:133)
> at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$8$$anonfun$apply$2$$anonfun$apply$3.apply(QueryStage.scala:81)
> at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$8$$anonfun$apply$2$$anonfun$apply$3.apply(QueryStage.scala:81)
> at org.apache.spark.sql.execution.SQLExecution$.withExecutionIdAndJobDesc(SQLExecution.scala:157)
> at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$8$$anonfun$apply$2.apply(QueryStage.scala:80)
> at org.apache.spark.sql.execution.adaptive.QueryStage$$anonfun$8$$anonfun$apply$2.apply(QueryStage.scala:78)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>  
>  
> {color:#de350b}*There are three points to note:*{color}
> 1. I've never called a method *zip* or anything like that
> 2. When I set the parameter "spark.sql.adaptive.enabled" to "false" , the error disappear
> 3. Others have encountered this problem:https://github.com/Intel-bigdata/spark-adaptive/issues/73



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org