You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Benyi Wang (JIRA)" <ji...@apache.org> on 2017/10/06 03:10:00 UTC
[jira] [Created] (SPARK-22211) LimitPushDown optimization for
FullOuterJoin generates wrong results
Benyi Wang created SPARK-22211:
----------------------------------
Summary: LimitPushDown optimization for FullOuterJoin generates wrong results
Key: SPARK-22211
URL: https://issues.apache.org/jira/browse/SPARK-22211
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 2.2.0
Environment: on community.cloude.databrick.com
Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
Reporter: Benyi Wang
LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate a wrong result:
Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected, but at right side we have 100K rows including 999, the result will be
- one row is (999, 999)
- the rest rows are (null, xxx)
Once you call show(), the row (999,999) has only 1/100000th chance to be selected by CollectLimit.
The actual optimization might be,
- push down limit
- but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.
Here is my notebook:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
{code:java}
import scala.util.Random._
val dl = shuffle(1 to 100000).toDF("id")
val dr = shuffle(1 to 100000).toDF("id")
println("data frame dl:")
dl.explain
println("data frame dr:")
dr.explain
val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)
j.explain
j.show(false)
{code}
{code}
data frame dl:
== Physical Plan ==
LocalTableScan [id#10]
data frame dr:
== Physical Plan ==
LocalTableScan [id#16]
== Physical Plan ==
CollectLimit 1
+- SortMergeJoin [id#10], [id#16], FullOuter
:- *Sort [id#10 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(id#10, 200)
: +- *LocalLimit 1
: +- LocalTableScan [id#10]
+- *Sort [id#16 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#16, 200)
+- LocalTableScan [id#16]
import scala.util.Random._
dl: org.apache.spark.sql.DataFrame = [id: int]
dr: org.apache.spark.sql.DataFrame = [id: int]
j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]
+----+---+
|id |id |
+----+---+
|null|148|
+----+---+
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org