You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:13:44 UTC

[jira] [Resolved] (SPARK-22923) Non-equi join(theta join) should use sort merge join

     [ https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22923.
----------------------------------
    Resolution: Incomplete

> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
>                 Key: SPARK-22923
>                 URL: https://issues.apache.org/jira/browse/SPARK-22923
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: Juhong Jung
>            Priority: Major
>              Labels: bulk-closed
>
> I found this issue during self join for retrieving same key last record problem. (similar with https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include equality expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not matched with ExtractEquiJoinKeys and go to last case, so BroadcastNestedLoopJoinExec is chosen even if data size is larger than spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to driver to broadcast.
> Now the job is aborted because of spark.driver.maxResultSize option or driver container is dead because of OutOfMemory.
> I think sort merge join is good join strategy for non-equi join, so maybe modifying pattern matcher is one of way to being spark planner chose sort merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
>   map(t => (t, scala.util.Random.nextInt(10000))).
>   toDF("id", "number").
>   dropDuplicates("number").
>   as("data")
> val laterData = data.
>   as("laterData").
>   select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
>   join(
>     laterData,
>     'number < 'later_number,
>     "leftouter"
>   ).
>   filter('later_id.isNull)
> latestData.explain
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org