You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Ruslan Dautkhanov (JIRA)" <ji...@apache.org> on 2018/01/17 18:26:00 UTC

[jira] [Commented] (SPARK-8682) Range Join for Spark SQL

    [ https://issues.apache.org/jira/browse/SPARK-8682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16329150#comment-16329150 ] 

Ruslan Dautkhanov commented on SPARK-8682:
------------------------------------------

Range joins need some serious optimization in Spark.

Even range-joining small datasets in Spark 2.2 is exceptionally slow which uses Broadcast Nested Loop Join.
Like, 30mx30k join producing 30m records (range join matches always to one record in this case) takes 6 minutes when using tens of vcores.

Folks even came up with interesting approaches using Python udf, Python intersect module and broadcast variables :
[https://stackoverflow.com/a/37955947/470583] to solve this riddle - I actually quite liked this approach from [~zero323]. 
Would be great if something similar would been implemented in Spark natively.

> Range Join for Spark SQL
> ------------------------
>
>                 Key: SPARK-8682
>                 URL: https://issues.apache.org/jira/browse/SPARK-8682
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>            Reporter: Herman van Hovell
>            Priority: Major
>         Attachments: perf_testing.scala
>
>
> Currently Spark SQL uses a Broadcast Nested Loop join (or a filtered Cartesian Join) when it has to execute the following range query:
> {noformat}
> SELECT A.*,
>        B.*
> FROM   tableA A
>        JOIN tableB B
>         ON A.start <= B.end
>          AND A.end > B.start
> {noformat}
> This is horribly inefficient. The performance of this query can be greatly improved, when one of the tables can be broadcasted, by creating a range index. A range index is basically a sorted map containing the rows of the smaller table, indexed by both the high and low keys. using this structure the complexity of the query would go from O(N * M) to O(N * 2 * LOG(M)), N = number of records in the larger table, M = number of records in the smaller (indexed) table.
> I have created a pull request for this. According to the [Spark SQL: Relational Data Processing in Spark|http://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf] paper similar work (page 11, section 7.2) has already been done by the ADAM project (cannot locate the code though). 
> Any comments and/or feedback are greatly appreciated.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org