You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Ash (JIRA)" <ji...@apache.org> on 2017/10/26 16:38:00 UTC

[jira] [Commented] (SPARK-22042) ReorderJoinPredicates can break when child's partitioning is not decided

    [ https://issues.apache.org/jira/browse/SPARK-22042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16220734#comment-16220734 ] 

Andrew Ash commented on SPARK-22042:
------------------------------------

Hi I'm seeing this problem as well, thanks for investigating and putting up a PR [~tejasp]!  Have you been running any of your clusters with a patched version of Spark including that change, and has it been behaving as expected?

The repro one of my users independently provided was this:
{noformat}
val rows = List(1, 2, 3, 4, 5, 6);
 
val df1 = sc.parallelize(rows).toDF("col").repartition(1);
val df2 = sc.parallelize(rows).toDF("col").repartition(2);
val df3 = sc.parallelize(rows).toDF("col").repartition(2);
 
val dd1 = df1.join(df2, df1.col("col").equalTo(df2.col("col"))).join(df3, df2.col("col").equalTo(df3.col("col")));
 
dd1.show;
{noformat}

> ReorderJoinPredicates can break when child's partitioning is not decided
> ------------------------------------------------------------------------
>
>                 Key: SPARK-22042
>                 URL: https://issues.apache.org/jira/browse/SPARK-22042
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.0, 2.2.0
>            Reporter: Tejas Patil
>            Priority: Minor
>
> When `ReorderJoinPredicates` tries to get the `outputPartitioning` of its children, the children may not be properly constructed as the child-subtree has to still go through other planner rules.
> In this particular case, the child is `SortMergeJoinExec`. Since the required `Exchange` operators are not in place (because `EnsureRequirements` runs _after_ `ReorderJoinPredicates`), the join's children would not have partitioning defined. This breaks while creation the `PartitioningCollection` here : https://github.com/apache/spark/blob/94439997d57875838a8283c543f9b44705d3a503/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L69
> Small repro:
> {noformat}
> context.sql("SET spark.sql.autoBroadcastJoinThreshold=0")
> val df = (0 until 50).map(i => (i % 5, i % 13, i.toString)).toDF("i", "j", "k")
> df.write.format("parquet").saveAsTable("table1")
> df.write.format("parquet").saveAsTable("table2")
> df.write.format("parquet").bucketBy(8, "j", "k").saveAsTable("bucketed_table")
> sql("""
>   SELECT *
>   FROM (
>     SELECT a.i, a.j, a.k
>     FROM bucketed_table a
>     JOIN table1 b
>     ON a.i = b.i
>   ) c
>   JOIN table2
>   ON c.i = table2.i
> """).explain
> {noformat}
> This fails with :
> {noformat}
> java.lang.IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
>   at scala.Predef$.require(Predef.scala:224)
>   at org.apache.spark.sql.catalyst.plans.physical.PartitioningCollection.<init>(partitioning.scala:324)
>   at org.apache.spark.sql.execution.joins.SortMergeJoinExec.outputPartitioning(SortMergeJoinExec.scala:69)
>   at org.apache.spark.sql.execution.ProjectExec.outputPartitioning(basicPhysicalOperators.scala:82)
>   at org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:91)
>   at org.apache.spark.sql.execution.joins.ReorderJoinPredicates$$anonfun$apply$1.applyOrElse(ReorderJoinPredicates.scala:76)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>   at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>   at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>   at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>   at org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:76)
>   at org.apache.spark.sql.execution.joins.ReorderJoinPredicates.apply(ReorderJoinPredicates.scala:34)
>   at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
>   at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:100)
>   at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>   at scala.collection.immutable.List.foldLeft(List.scala:84)
>   at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:100)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:90)
>   at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:90)
>   at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
>   at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:201)
>   at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:114)
>   at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:201)
>   at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:147)
>   at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:78)
>   at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:75)
>   at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:91)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:464)
>   at org.apache.spark.sql.Dataset.explain(Dataset.scala:477)
>   ... 60 elided
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org