You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Marco Gaido (JIRA)" <ji...@apache.org> on 2018/01/13 10:35:00 UTC

[jira] [Commented] (SPARK-22923) Non-equi join(theta join) should use sort merge join

    [ https://issues.apache.org/jira/browse/SPARK-22923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16325074#comment-16325074 ] 

Marco Gaido commented on SPARK-22923:
-------------------------------------

I dob't think SortMergeJoinExec can be used, since the conditions in theta join may be of any kind, therefore you should always compare all the rows from both sides. I think a more interesting question is why CartesianProductExec is used only for InnerLike operations.

> Non-equi join(theta join) should use sort merge join
> ----------------------------------------------------
>
>                 Key: SPARK-22923
>                 URL: https://issues.apache.org/jira/browse/SPARK-22923
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 2.2.1
>            Reporter: Juhong Jung
>
> I found this issue during self join for retrieving same key last record problem. (similar with https://stackoverflow.com/questions/1313120/retrieving-the-last-record-in-each-group)
> Currently, SortMergeJoinExec is chosen only if join expression include equality expression cause SparkStrategies pattern matcher use ExtractEquiJoinKeys. (See SparkStrategies.scala apply method).
> When join with non-equi condition only expression, that expression is not matched with ExtractEquiJoinKeys and go to last case, so BroadcastNestedLoopJoinExec is chosen even if data size is larger than spark.sql.autoBroadcastJoinThreshold.
> For example, Dataframe is about 50G and spark.sql.autoBroadcastJoinThreshold is 10MB, but BroadcastNestedLoopJoinExec is chosen and large size dataframe is sent to driver to broadcast.
> Now the job is aborted because of spark.driver.maxResultSize option or driver container is dead because of OutOfMemory.
> I think sort merge join is good join strategy for non-equi join, so maybe modifying pattern matcher is one of way to being spark planner chose sort merge join for non-equi join.
> And, I have just added trivial equal condition to join expression for using sort merge join.
> Below code is sample.
> {code:java}
> val data = (1 to 10000).
>   map(t => (t, scala.util.Random.nextInt(10000))).
>   toDF("id", "number").
>   dropDuplicates("number").
>   as("data")
> val laterData = data.
>   as("laterData").
>   select(data.schema.fields.map(f => col(f.name).as("later_" + f.name)): _*)
> val latestData = data.
>   join(
>     laterData,
>     'number < 'later_number,
>     "leftouter"
>   ).
>   filter('later_id.isNull)
> latestData.explain
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org