You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Johannes Simon (JIRA)" <ji...@apache.org> on 2014/12/10 17:58:13 UTC
[jira] [Created] (SPARK-4818) Join operation should use
iterator/lazy evaluation
Johannes Simon created SPARK-4818:
-------------------------------------
Summary: Join operation should use iterator/lazy evaluation
Key: SPARK-4818
URL: https://issues.apache.org/jira/browse/SPARK-4818
Project: Spark
Issue Type: Improvement
Components: Spark Core
Affects Versions: 1.1.1
Reporter: Johannes Simon
Priority: Minor
The current implementation of the join operation does not use an iterator (i.e. lazy evaluation), causing it to explicitly evaluate the co-grouped values. In big data applications, these value collections can be very large. This causes the *cartesian product of all co-grouped values* for a specific key of both RDDs to be kept in memory during the flatMapValues operation, resulting in an *O(size(pair._1)*size(pair._2))* memory consumption instead of *O(1)*. Very large value collections will therefore cause "GC overhead limit exceeded" exceptions and fail the task, or at least slow down execution dramatically.
{code:title=PairRDDFunctions.scala|borderStyle=solid}
//...
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1; w <- pair._2) yield (v, w)
)
}
//...
{code}
Since cogroup returns an Iterable instance of an Array, the join implementation could be changed to the following, which uses lazy evaluation instead, and has almost no memory overhead:
{code:title=PairRDDFunctions.scala|borderStyle=solid}
//...
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}
//...
{code}
Alternatively, if the current implementation is intentionally not using lazy evaluation for some reason, there could be a *lazyJoin()* method next to the original join implementation that utilizes lazy evaluation. This of course applies to other join operations as well.
Thanks! :)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org