You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Josh Rosen (JIRA)" <ji...@apache.org> on 2014/12/22 23:29:13 UTC

[jira] [Resolved] (SPARK-4818) Join operation should use iterator/lazy evaluation

     [ https://issues.apache.org/jira/browse/SPARK-4818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Rosen resolved SPARK-4818.
-------------------------------
       Resolution: Fixed
    Fix Version/s: 1.2.1
                   1.3.0
                   1.1.2

Issue resolved by pull request 3671
[https://github.com/apache/spark/pull/3671]

> Join operation should use iterator/lazy evaluation
> --------------------------------------------------
>
>                 Key: SPARK-4818
>                 URL: https://issues.apache.org/jira/browse/SPARK-4818
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.1.1
>            Reporter: Johannes Simon
>             Fix For: 1.1.2, 1.3.0, 1.2.1
>
>
> The current implementation of the join operation does not use an iterator (i.e. lazy evaluation), causing it to explicitly evaluate the co-grouped values. In big data applications, these value collections can be very large. This causes the *cartesian product of all co-grouped values* for a specific key of both RDDs to be kept in memory during the flatMapValues operation, resulting in an *O(size(pair._1)*size(pair._2))* memory consumption instead of *O(1)*. Very large value collections will therefore cause "GC overhead limit exceeded" exceptions and fail the task, or at least slow down execution dramatically.
> {code:title=PairRDDFunctions.scala|borderStyle=solid}
> //...
> def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
>   this.cogroup(other, partitioner).flatMapValues( pair =>
>     for (v <- pair._1; w <- pair._2) yield (v, w)
>   )
> }
> //...
> {code}
> Since cogroup returns an Iterable instance of an Array, the join implementation could be changed to the following, which uses lazy evaluation instead, and has almost no memory overhead:
> {code:title=PairRDDFunctions.scala|borderStyle=solid}
> //...
> def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
>   this.cogroup(other, partitioner).flatMapValues( pair =>
>     for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
>   )
> }
> //...
> {code}
> Alternatively, if the current implementation is intentionally not using lazy evaluation for some reason, there could be a *lazyJoin()* method next to the original join implementation that utilizes lazy evaluation. This of course applies to other join operations as well.
> Thanks! :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org