You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by rose kunj <ro...@yahoo.com> on 2014/02/03 05:47:21 UTC

Re: Hash Join in Spark

Since, my earlier question is still unanswered, I have decided to dig into the spark code myself. However, I am new to spark as well as scala in particular. Can some one help me understand the following code snippet:

1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
2.    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
3.    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
4.   prfs.mapValues { case Seq(vs, ws) =>
      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
5.    }
6. }


Thanks,
rose


On Friday, January 24, 2014 4:32 PM, rose <ro...@yahoo.com> wrote:
 
Hi all,

I want to know more about join operation in spark. I know it uses hash join,
but I am not able to figure out the  nature of the implementation such as
blocking, non blocking, or shared , not shared partitions.

If anyone knows, please reply to this post along with the linkers of the
implementation in the spark source files.

Thanks,
rose



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Re: Hash Join in Spark

Posted by Aaron Davidson <il...@gmail.com>.
This method is doing very little. Line 2 constructs the CoGroupedRDD, which
will do all the real work. Note that while this cogroup function just
groups 2 RDDs together, CoGroupedRDD allows general n-way cogrouping, so it
takes a Seq[RDD(K, _)] rather than just 2 such key-value RDDs.

The rest of the code in this method is simply converting the result of
CoGroupedRDD back from its generalized form into an RDD[(K, Seq[V],
Seq[W])]. (CoGroupedRDD returns an RDD[(K, Seq[Seq[_]])] as there are n of
those Seq[_]s, one per grouping RDD.) To go over some of the finer points
of these remaining lines;

3. This line is actually not necessary, and is simply confusing. I have
submitted a small patch
<https://github.com/apache/incubator-spark/pull/530>to remove it.*

4. mapValues will iterate through the results of the CoGroupedRDD (i.e.,
the already-cogrouped values) in order to change the type of the return
value from the generic Seq[Seq[_]] to a (Seq[V], Seq[W]), since we know
each Seq has exactly 2 elements. The remainder of this line simply does the
casting from Seq[_] to Seq[V] or Seq[W] as appropriate.

* Here's a real explanation for line 3, in case you're curious about the
Scala magic that's going on. Normally, all RDDs that look like key-value
pairs (having a generic type of Tuple2, like [K, V]) are implicitly
converted<http://tomjefferys.blogspot.com/2011/11/implicit-conversions-in-scala.html>
to
PairRDDFunctions, to provide extra functions that can operate over these
types of RDDs. For reasons slightly unclear, the author of this code chose
to forgo using the implicit conversion in favor of explicitly converting
the CoGroupedRDD into a PairRDDFunctions in order to gain access to the
mapValues method.


On Sun, Feb 2, 2014 at 8:47 PM, rose kunj <ro...@yahoo.com> wrote:

> Since, my earlier question is still unanswered, I have decided to dig into
> the spark code myself. However, I am new to spark as well as scala in
> particular. Can some one help me understand the following code snippet:
>
> 1. def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K,
> (Seq[V], Seq[W]))] = {
> 2.    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
> 3.    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K],
> ClassTags.seqSeqClassTag)
> 4.   prfs.mapValues { case Seq(vs, ws) =>
>       (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
> 5.    }
> 6. }
>
>
> Thanks,
> rose
>
>   On Friday, January 24, 2014 4:32 PM, rose <ro...@yahoo.com> wrote:
>  Hi all,
>
> I want to know more about join operation in spark. I know it uses hash
> join,
> but I am not able to figure out the  nature of the implementation such as
> blocking, non blocking, or shared , not shared partitions.
>
> If anyone knows, please reply to this post along with the linkers of the
> implementation in the spark source files.
>
> Thanks,
> rose
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Hash-Join-in-Spark-tp873.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
>
>