You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Benyi Wang (JIRA)" <ji...@apache.org> on 2017/10/06 03:10:00 UTC

[jira] [Created] (SPARK-22211) LimitPushDown optimization for FullOuterJoin generates wrong results

Benyi Wang created SPARK-22211:
----------------------------------

             Summary: LimitPushDown optimization for FullOuterJoin generates wrong results
                 Key: SPARK-22211
                 URL: https://issues.apache.org/jira/browse/SPARK-22211
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.2.0
         Environment: on community.cloude.databrick.com 
Runtime Version 3.2 (includes Apache Spark 2.2.0, Scala 2.11)
            Reporter: Benyi Wang


LimitPushDown pushes LocalLimit to one side for FullOuterJoin, but this may generate a wrong result:

Assume we use limit(1) and LocalLimit will be pushed to left side, and id=999 is selected, but at right side we have 100K rows including 999, the result will be
- one row is (999, 999)
- the rest rows are (null, xxx)

Once you call show(), the row (999,999) has only 1/100000th chance to be selected by CollectLimit.

The actual optimization might be, 
- push down limit
- but convert the join to Broadcast LeftOuterJoin or RightOuterJoin.

Here is my notebook:
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/349451637617406/2750346983121008/6888856075277290/latest.html
{code:java}
import scala.util.Random._

val dl = shuffle(1 to 100000).toDF("id")
val dr = shuffle(1 to 100000).toDF("id")

println("data frame dl:")
dl.explain

println("data frame dr:")
dr.explain

val j = dl.join(dr, dl("id") === dr("id"), "outer").limit(1)

j.explain

j.show(false)
{code}

{code}
data frame dl:
== Physical Plan ==
LocalTableScan [id#10]
data frame dr:
== Physical Plan ==
LocalTableScan [id#16]
== Physical Plan ==
CollectLimit 1
+- SortMergeJoin [id#10], [id#16], FullOuter
   :- *Sort [id#10 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#10, 200)
   :     +- *LocalLimit 1
   :        +- LocalTableScan [id#10]
   +- *Sort [id#16 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(id#16, 200)
         +- LocalTableScan [id#16]
import scala.util.Random._
dl: org.apache.spark.sql.DataFrame = [id: int]
dr: org.apache.spark.sql.DataFrame = [id: int]
j: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: int, id: int]

+----+---+
|id  |id |
+----+---+
|null|148|
+----+---+
{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org