You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@hive.apache.org by Gautam <ga...@gmail.com> on 2016/03/16 01:37:11 UTC
Tez reducer parallelism ..
Hello,
I'm trying to optimize some queries in Hive that were recently switched to
Tez.. had a general question regarding reducer parallelism ..
A lot of our queries do the following style of simultaneous windowing ..
SELECT
row_number() OVER( PARTITION BY app, user, type ORDER BY ts ) as
a_number,
row_number() OVER( PARTITION BY day, app, user, type ORDER BY ts ) as
type_rank,
row_number() OVER( PARTITION BY day, app, user ORDER BY ts ) as
dau_rank,
FROM messages
WHERE ...
Since each OVER / PARTITION-By clause is independent they can the put into
parallelized Reducer phases. But what I see is that these get serialized
into M1 -> R1 -> R2 -> R3 .. instead of M1 -> [ R1, R2, R3 ]
Is this something that Tez tries to do at all or an optimization that I can
use to my benefit ?
Cheers,
-Gautam.
Re: Tez reducer parallelism ..
Posted by Gopal Vijayaraghavan <go...@apache.org>.
> So you'r saying, since these windows are part of a single SELECT
>projection they need to be serial?
Yes, with a full shuffle of the result so far for each new OVER().
> row_number() OVER( PARTITION BY app, user, type ORDER BY ts
>) as a_number,
> row_number() OVER( PARTITION BY day, app, user, type ORDER
>BY ts ) as type_rank,
> row_number() OVER( PARTITION BY day, app, user ORDER BY
>ts ) as dau_rank,
You can write your own stateful windowing UDAF which does this over the
a subset window, provided your partition by is pretty wide.
compute_app_dau(app, day, user, type, ts) OVER(PARTITION BY app, user
ORDER BY day, type,
ts) as a_number
compute_type_dau(app, day, user, type, ts) OVER(PARTITION BY app, user
ORDER BY day, type, ts) as type_rank
etc.
It's upto your impl to remember the last row & take avantage of the
ordering to produce ranking along timestamps - hive will distribute by
app, user & sort by app, user, day, type, ts & feed into your UDAF.
If this is mostly done at the user+app tuple, the real skew should only be
the biggest user X app combo.
I'm guessing it can even be a single UDAF returning a Struct with all
numbers in one go, but I haven't dug deeper into
ISupportStreamingModeForWindowing yet.
Cheers,
Gopal
Re: Tez reducer parallelism ..
Posted by Gautam <ga...@gmail.com>.
> The windowing is not simultaneous unless they are all over the same window
> - the following query has 3 different windows applied over the same rows
> sequentially.
Ok. Just wanted to confirm. Maybe I could restructure my query to get more
parallelism ..
> They are all over the same rows so they're done in sequence, so the final
> row-set contains all values, causing multiple shuffles of the same rows.
So you'r saying, since these windows are part of a single SELECT
projection they need to be serial?
> Does your query only have row_numbers or does it have other columns in
them?
yes. Something like ...
{code}
SELECT
row_number() OVER( PARTITION BY app, user, type ORDER BY
ts ) as a_number,
row_number() OVER( PARTITION BY day, app, user, type ORDER
BY ts ) as type_rank,
row_number() OVER( PARTITION BY day, app, user ORDER BY
ts ) as dau_rank,
day,
user,
app,
type,
ts
FROM messages
{code}
On Tue, Mar 15, 2016 at 6:41 PM, Gopal Vijayaraghavan <go...@apache.org>
wrote:
> > A lot of our queries do the following style of simultaneous windowing ..
>
> The windowing is not simultaneous unless they are all over the same window
> - the following query has 3 different windows applied over the same rows
> sequentially.
>
> > SELECT
> > row_number() OVER( PARTITION BY app, user,
> > type ORDER BY ts )as a_number,
> > row_number() OVER( PARTITION BY day, app, user,
> > type ORDER BY ts )as type_rank,
>
> > Since each OVER / PARTITION-By clause is independent they can the put
> >into parallelized Reducer phases.
>
> They are all over the same rows so they're done in sequence, so the final
> row-set contains all values, causing multiple shuffles of the same rows.
>
> > Is this something that Tez tries to do at all or an optimization that I
> >can use to my benefit ?
>
> Does your query only have row_numbers or does it have other columns in
> them?
>
> Cheers,
> Gopal
>
>
>
--
"If you really want something in this life, you have to work for it. Now,
quiet! They're about to announce the lottery numbers..."
Re: Tez reducer parallelism ..
Posted by Gopal Vijayaraghavan <go...@apache.org>.
> A lot of our queries do the following style of simultaneous windowing ..
The windowing is not simultaneous unless they are all over the same window
- the following query has 3 different windows applied over the same rows
sequentially.
> SELECT
> row_number() OVER( PARTITION BY app, user,
> type ORDER BY ts )as a_number,
> row_number() OVER( PARTITION BY day, app, user,
> type ORDER BY ts )as type_rank,
> Since each OVER / PARTITION-By clause is independent they can the put
>into parallelized Reducer phases.
They are all over the same rows so they're done in sequence, so the final
row-set contains all values, causing multiple shuffles of the same rows.
> Is this something that Tez tries to do at all or an optimization that I
>can use to my benefit ?
Does your query only have row_numbers or does it have other columns in
them?
Cheers,
Gopal