You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Xiao Li (JIRA)" <ji...@apache.org> on 2017/11/03 06:06:00 UTC
[jira] [Updated] (SPARK-22211) LimitPushDown optimization for
FullOuterJoin generates wrong results
[ https://issues.apache.org/jira/browse/SPARK-22211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-22211:
----------------------------
Target Version/s: 2.2.1, 2.3.0
> LimitPushDown optimization for FullOuterJoin generates wrong results
> --------------------------------------------------------------------
>
> Key: SPARK-22211
> URL: https://issues.apache.org/jira/browse/SPARK-22211
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.2.0
> Environment: on community.cloude.databrick.com
> Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
> Reporter: Benyi Wang
> Priority: Major
>
> LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate a wrong result:
> Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected, but at right side we have 100K rows including 999, the result will be
> - one row is (999, 999)
> - the rest rows are (null, xxx)
> Once you call show(), the row (999,999) has only 1/100000th chance to be selected by CollectLimit.
> The actual optimization might be,
> - push down limit
> - but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
> Here is my notebook:
> https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
> {code:java}
> import scala.util.Random._
> val dl = shuffle(1 to 100000).toDF("id")
> val dr = shuffle(1 to 100000).toDF("id")
> println("data frame dl:")
> dl.explain
> println("data frame dr:")
> dr.explain
> val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
> j.explain
> j.show(false)
> {code}
> {code}
> data frame dl:
> == Physical Plan ==
> LocalTableScan [id#10]
> data frame dr:
> == Physical Plan ==
> LocalTableScan [id#16]
> == Physical Plan ==
> CollectLimit 1
> +- SortMergeJoin [id#10], [id#16], FullOuter
> :- *Sort [id#10 ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(id#10, 200)
> : +- *LocalLimit 1
> : +- LocalTableScan [id#10]
> +- *Sort [id#16 ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(id#16, 200)
> +- LocalTableScan [id#16]
> import scala.util.Random._
> dl: org.apache.spark.sql.DataFrame = [id: int]
> dr: org.apache.spark.sql.DataFrame = [id: int]
> j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
> +----+---+
> |id |id |
> +----+---+
> |null|148|
> +----+---+
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org