You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Gautam <ga...@gmail.com> on 2016/03/16 01:37:11 UTC

Tez reducer parallelism ..

Hello,


I'm trying to optimize some queries in Hive that were recently switched to
Tez.. had a general question regarding reducer parallelism ..

A lot of our queries do the following style of simultaneous windowing ..


 SELECT

    row_number() OVER( PARTITION BY app, user, type ORDER BY ts ) as
a_number,

    row_number() OVER( PARTITION BY day, app, user, type ORDER BY ts ) as
type_rank,

    row_number() OVER( PARTITION BY day, app, user   ORDER BY ts ) as
dau_rank,

 FROM messages

 WHERE ...


Since each OVER / PARTITION-By clause is independent they can the put into
parallelized Reducer phases. But what I see is that these get serialized
into M1 -> R1 -> R2 -> R3 .. instead of M1 -> [ R1, R2, R3 ]

Is this something that Tez tries to do at all or an optimization that I can
use to my benefit ?

Cheers,
-Gautam.

Re: Tez reducer parallelism ..

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> So you'r saying, since these windows  are part of a single SELECT
>projection they need to be serial?

Yes, with a full shuffle of the result so far for each new OVER().

>              row_number() OVER( PARTITION BY app, user, type ORDER BY ts
>) as a_number,
>              row_number() OVER( PARTITION BY day, app, user, type ORDER
>BY ts ) as type_rank,
>              row_number() OVER( PARTITION BY day, app, user   ORDER BY
>ts ) as  dau_rank,

You can write your own stateful windowing UDAF which does this over the
a subset window, provided your partition by is pretty wide.

compute_app_dau(app, day, user, type, ts) OVER(PARTITION BY app, user
ORDER BY day, type,
ts) as a_number
compute_type_dau(app, day, user, type, ts) OVER(PARTITION BY app, user
ORDER BY day, type, ts) as type_rank

etc.

It's upto your impl to remember the last row & take avantage of the
ordering to produce ranking along timestamps - hive will distribute by
app, user & sort by app, user, day, type, ts & feed into your UDAF.

If this is mostly done at the user+app tuple, the real skew should only be
the biggest user X app combo.

I'm guessing it can even be a single UDAF returning a Struct with all
numbers in one go, but I haven't dug deeper into
ISupportStreamingModeForWindowing yet.

Cheers,
Gopal



Re: Tez reducer parallelism ..

Posted by Gautam <ga...@gmail.com>.
> The windowing is not simultaneous unless they are all over the same window
> - the following query has 3 different windows applied over the same rows
> sequentially.

Ok. Just wanted to confirm. Maybe I could restructure my query to get more
parallelism ..

> They are all over the same rows so they're done in sequence, so the final
> row-set contains all values, causing multiple shuffles of the same rows.

So you'r saying, since these windows  are part of a single SELECT
projection they need to be serial?



> Does your query only have row_numbers or does it have other columns in
them?

yes.  Something like ...

{code}

 SELECT


              row_number() OVER( PARTITION BY app, user, type ORDER BY
ts ) as a_number,

              row_number() OVER( PARTITION BY day, app, user, type ORDER
BY ts ) as type_rank,

              row_number() OVER( PARTITION BY day, app, user   ORDER BY
ts ) as  dau_rank,

              day,

              user,

              app,

              type,

              ts

FROM messages

{code}



On Tue, Mar 15, 2016 at 6:41 PM, Gopal Vijayaraghavan <go...@apache.org>
wrote:

> > A lot of our queries do the following style of simultaneous windowing ..
>
> The windowing is not simultaneous unless they are all over the same window
> - the following query has 3 different windows applied over the same rows
> sequentially.
>
> > SELECT
> >    row_number() OVER( PARTITION BY app, user,
> > type ORDER BY ts )as a_number,
> >    row_number() OVER( PARTITION BY day, app, user,
> > type ORDER BY ts )as type_rank,
>
> > Since each OVER / PARTITION-By clause is independent they can the put
> >into parallelized Reducer phases.
>
> They are all over the same rows so they're done in sequence, so the final
> row-set contains all values, causing multiple shuffles of the same rows.
>
> > Is this something that Tez tries to do at all or an optimization that I
> >can use to my benefit ?
>
> Does your query only have row_numbers or does it have other columns in
> them?
>
> Cheers,
> Gopal
>
>
>


-- 
"If you really want something in this life, you have to work for it. Now,
quiet! They're about to announce the lottery numbers..."

Re: Tez reducer parallelism ..

Posted by Gopal Vijayaraghavan <go...@apache.org>.
> A lot of our queries do the following style of simultaneous windowing ..

The windowing is not simultaneous unless they are all over the same window
- the following query has 3 different windows applied over the same rows
sequentially.

> SELECT
>    row_number() OVER( PARTITION BY app, user,
> type ORDER BY ts )as a_number,
>    row_number() OVER( PARTITION BY day, app, user,
> type ORDER BY ts )as type_rank,

> Since each OVER / PARTITION-By clause is independent they can the put
>into parallelized Reducer phases.

They are all over the same rows so they're done in sequence, so the final
row-set contains all values, causing multiple shuffles of the same rows.

> Is this something that Tez tries to do at all or an optimization that I
>can use to my benefit ?

Does your query only have row_numbers or does it have other columns in
them?

Cheers,
Gopal