You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Johannes Simon (JIRA)" <ji...@apache.org> on 2014/12/10 17:58:13 UTC

[jira] [Created] (SPARK-4818) Join operation should use iterator/lazy evaluation

Johannes Simon created SPARK-4818:
-------------------------------------

             Summary: Join operation should use iterator/lazy evaluation
                 Key: SPARK-4818
                 URL: https://issues.apache.org/jira/browse/SPARK-4818
             Project: Spark
          Issue Type: Improvement
          Components: Spark Core
    Affects Versions: 1.1.1
            Reporter: Johannes Simon
            Priority: Minor


The current implementation of the join operation does not use an iterator (i.e. lazy evaluation), causing it to explicitly evaluate the co-grouped values. In big data applications, these value collections can be very large. This causes the *cartesian product of all co-grouped values* for a specific key of both RDDs to be kept in memory during the flatMapValues operation, resulting in an *O(size(pair._1)*size(pair._2))* memory consumption instead of *O(1)*. Very large value collections will therefore cause "GC overhead limit exceeded" exceptions and fail the task, or at least slow down execution dramatically.

{code:title=PairRDDFunctions.scala|borderStyle=solid}
//...
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1; w <- pair._2) yield (v, w)
  )
}
//...
{code}

Since cogroup returns an Iterable instance of an Array, the join implementation could be changed to the following, which uses lazy evaluation instead, and has almost no memory overhead:
{code:title=PairRDDFunctions.scala|borderStyle=solid}
//...
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}
//...
{code}

Alternatively, if the current implementation is intentionally not using lazy evaluation for some reason, there could be a *lazyJoin()* method next to the original join implementation that utilizes lazy evaluation. This of course applies to other join operations as well.

Thanks! :)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org