You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Alessandro Baretta <al...@gmail.com> on 2015/01/15 16:39:43 UTC

Join implementation in SparkSQL

Hello,

Where can I find docs about how joins are implemented in SparkSQL? In
particular, I'd like to know whether they are implemented according to
their relational algebra definition as filters on top of a cartesian
product.

Thanks,

Alex

Re: Join implementation in SparkSQL

Posted by Yin Huai <yh...@databricks.com>.
Hi Alex,

Can you attach the output of sql("explain extended <your
query>").collect.foreach(println)?

Thanks,

Yin

On Fri, Jan 16, 2015 at 1:54 PM, Alessandro Baretta <al...@gmail.com>
wrote:

> Reynold,
>
> The source file you are directing me to is a little too terse for me to
> understand what exactly is going on. Let me tell you what I'm trying to do
> and what problems I'm encountering, so that you might be able to better
> direct me investigation of the SparkSQL codebase.
>
> I am computing the join of three tables, sharing the same primary key,
> composed of three fields, and having several other fields. My first attempt
> at computing this join was in SQL, with a query much like this slightly
> simplified one:
>
>          SELECT
>           a.key1 key1, a.key2 key2, a.key3 key3,
>           a.data1   adata1,    a.data2    adata2,    ...
>           b.data1   bdata1,    b.data2    bdata2,    ...
>           c.data1   cdata1,    c.data2    cdata2,    ...
>         FROM a, b, c
>         WHERE
>           a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
>           b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3
>
> This code yielded a SparkSQL job containing 40,000 stages, which failed
> after filling up all available disk space on the worker nodes.
>
> I then wrote this join as a plain mapreduce join. The code looks roughly
> like this:
> val a_ = a.map(row => (key(row), ("a", row))
> val b_ = b.map(row => (key(row), ("b", row))
> val c_ = c.map(row => (key(row), ("c", row"))
> val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey
>
> This implementation yields approximately 1600 stages and completes in a few
> minutes on a 256 core cluster. The huge difference in scale of the two jobs
> makes me think that SparkSQL is implementing my join as cartesian product.
> This is they query plan--I'm not sure I can read it, but it does seem to
> imply that the filter conditions are not being pushed far down enough:
>
>  'Project [...]
>  'Filter (((((('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
>   'Join Inner, None
>    'Join Inner, None
>
> Is maybe SparkSQL unable to push join conditions down from the WHERE clause
> into the join itself?
>
> Alex
>
> On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin <rx...@databricks.com> wrote:
>
> > It's a bunch of strategies defined here:
> >
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
> >
> > In most common use cases (e.g. inner equi join), filters are pushed below
> > the join or into the join. Doing a cartesian product followed by a filter
> > is too expensive.
> >
> >
> > On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <
> alexbaretta@gmail.com
> > > wrote:
> >
> >> Hello,
> >>
> >> Where can I find docs about how joins are implemented in SparkSQL? In
> >> particular, I'd like to know whether they are implemented according to
> >> their relational algebra definition as filters on top of a cartesian
> >> product.
> >>
> >> Thanks,
> >>
> >> Alex
> >>
> >
> >
>

Re: Join implementation in SparkSQL

Posted by Alessandro Baretta <al...@gmail.com>.
Reynold,

The source file you are directing me to is a little too terse for me to
understand what exactly is going on. Let me tell you what I'm trying to do
and what problems I'm encountering, so that you might be able to better
direct me investigation of the SparkSQL codebase.

I am computing the join of three tables, sharing the same primary key,
composed of three fields, and having several other fields. My first attempt
at computing this join was in SQL, with a query much like this slightly
simplified one:

         SELECT
          a.key1 key1, a.key2 key2, a.key3 key3,
          a.data1   adata1,    a.data2    adata2,    ...
          b.data1   bdata1,    b.data2    bdata2,    ...
          c.data1   cdata1,    c.data2    cdata2,    ...
        FROM a, b, c
        WHERE
          a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
          b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3

This code yielded a SparkSQL job containing 40,000 stages, which failed
after filling up all available disk space on the worker nodes.

I then wrote this join as a plain mapreduce join. The code looks roughly
like this:
val a_ = a.map(row => (key(row), ("a", row))
val b_ = b.map(row => (key(row), ("b", row))
val c_ = c.map(row => (key(row), ("c", row"))
val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey

This implementation yields approximately 1600 stages and completes in a few
minutes on a 256 core cluster. The huge difference in scale of the two jobs
makes me think that SparkSQL is implementing my join as cartesian product.
This is they query plan--I'm not sure I can read it, but it does seem to
imply that the filter conditions are not being pushed far down enough:

 'Project [...]
 'Filter (((((('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
  'Join Inner, None
   'Join Inner, None

Is maybe SparkSQL unable to push join conditions down from the WHERE clause
into the join itself?

Alex

On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin <rx...@databricks.com> wrote:

> It's a bunch of strategies defined here:
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
>
> In most common use cases (e.g. inner equi join), filters are pushed below
> the join or into the join. Doing a cartesian product followed by a filter
> is too expensive.
>
>
> On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <alexbaretta@gmail.com
> > wrote:
>
>> Hello,
>>
>> Where can I find docs about how joins are implemented in SparkSQL? In
>> particular, I'd like to know whether they are implemented according to
>> their relational algebra definition as filters on top of a cartesian
>> product.
>>
>> Thanks,
>>
>> Alex
>>
>
>

RE: Join implementation in SparkSQL

Posted by "Cheng, Hao" <ha...@intel.com>.
Not so sure about your question, but the SparkStrategies.scala and Optimizer.scala is a good start if you want to get details of the join implementation or optimization.

-----Original Message-----
From: Andrew Ash [mailto:andrew@andrewash.com] 
Sent: Friday, January 16, 2015 4:52 AM
To: Reynold Xin
Cc: Alessandro Baretta; dev@spark.apache.org
Subject: Re: Join implementation in SparkSQL

What Reynold is describing is a performance optimization in implementation, but the semantics of the join (cartesian product plus relational algebra
filter) should be the same and produce the same results.

On Thu, Jan 15, 2015 at 1:36 PM, Reynold Xin <rx...@databricks.com> wrote:

> It's a bunch of strategies defined here:
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/or
> g/apache/spark/sql/execution/SparkStrategies.scala
>
> In most common use cases (e.g. inner equi join), filters are pushed 
> below the join or into the join. Doing a cartesian product followed by 
> a filter is too expensive.
>
>
> On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta 
> <alexbaretta@gmail.com
> >
> wrote:
>
> > Hello,
> >
> > Where can I find docs about how joins are implemented in SparkSQL? 
> > In particular, I'd like to know whether they are implemented 
> > according to their relational algebra definition as filters on top 
> > of a cartesian product.
> >
> > Thanks,
> >
> > Alex
> >
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
For additional commands, e-mail: dev-help@spark.apache.org


Re: Join implementation in SparkSQL

Posted by Andrew Ash <an...@andrewash.com>.
What Reynold is describing is a performance optimization in implementation,
but the semantics of the join (cartesian product plus relational algebra
filter) should be the same and produce the same results.

On Thu, Jan 15, 2015 at 1:36 PM, Reynold Xin <rx...@databricks.com> wrote:

> It's a bunch of strategies defined here:
>
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
>
> In most common use cases (e.g. inner equi join), filters are pushed below
> the join or into the join. Doing a cartesian product followed by a filter
> is too expensive.
>
>
> On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <alexbaretta@gmail.com
> >
> wrote:
>
> > Hello,
> >
> > Where can I find docs about how joins are implemented in SparkSQL? In
> > particular, I'd like to know whether they are implemented according to
> > their relational algebra definition as filters on top of a cartesian
> > product.
> >
> > Thanks,
> >
> > Alex
> >
>

Re: Join implementation in SparkSQL

Posted by Reynold Xin <rx...@databricks.com>.
It's a bunch of strategies defined here:
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala

In most common use cases (e.g. inner equi join), filters are pushed below
the join or into the join. Doing a cartesian product followed by a filter
is too expensive.


On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <al...@gmail.com>
wrote:

> Hello,
>
> Where can I find docs about how joins are implemented in SparkSQL? In
> particular, I'd like to know whether they are implemented according to
> their relational algebra definition as filters on top of a cartesian
> product.
>
> Thanks,
>
> Alex
>