You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Spark User <sp...@gmail.com> on 2016/10/30 18:58:16 UTC

Performance bug in UDAF?

Hi All,

I have a UDAF that seems to perform poorly when its input is skewed. I have
been debugging the UDAF implementation but I don't see any code that is
causing the performance to degrade. More details on the data and the
experiments I have run.

DataSet: Assume 3 columns, column1 being the key.
Column1   Column2  Column3
a               1             x
a               2             x
a               3             x
a               4             x
a               5             x
a               6             z
5 million row for a
....
a               1000000   y
b               9             y
b               9             y
b               10           y
3 million rows for b
...
more rows
total rows is 100 million


a has 5 million rows.Column2 for a has 1 million unique values.
b has 3 million rows. Column2 for b has 800000 unique values.

Column 3 has just 100s of unique values not in the order of millions, for
both a and b.

Say totally there are 100 million rows as the input to a UDAF aggregation.
And the skew in data is for the keys a and b. All other rows can be ignored
and do not cause any performance issue/ hot partitions.

The code does a dataSet.groupBy("Column1").agg(udaf("Column2", "Column3").

I commented out the UDAF implementation for update and merge methods, so
essentially the UDAF was doing nothing.

With this code (empty updated and merge for UDAF) the performance for a
mircro-batch is 16 minutes per micro-batch, micro-batch containing 100
million rows, with 5million rows for a and 1 million unique values for
Column2 for a.

But when I pass empty values for Column2 with nothing else change,
effectively reducing the 1 million unique values for Column2 to just 1
unique value, empty value. The batch processing time goes down to 4 minutes.

So I am trying to understand why is there such a big performance
difference? What in UDAF causes the processing time to increase in orders
of magnitude when there is a skew in the data as observed above?

Any insight from spark developers, contributors, or anyone else who has a
deeper understanding of UDAF would be helpful.

Thanks,
Bharath

Re: Performance bug in UDAF?

Posted by Spark User <sp...@gmail.com>.
Pinging again on this topic.

Is there an easy way to select TopN in a RelationalGroupedDataset?
Basically in the below example dataSet.groupBy("Column1").agg(udaf("Column2",
"Column3") returns a RelationalGroupedDataset. One way to address the data
skew would be to reduce the data per key (Column1 being the key here). And
if we are interested in TopN values per column (like Column2, Column3) how
can we get TopN from RelationalGroupedDataset?

Is the only way to get TopN is by implementing it in the udaf?

Would appreciate any pointers or examples if someone has solved similar
problem.

Thanks,
Bharath


On Mon, Oct 31, 2016 at 11:40 AM, Spark User <sp...@gmail.com>
wrote:

> Trying again. Hoping to find some help in figuring out the performance
> bottleneck we are observing.
>
> Thanks,
> Bharath
>
> On Sun, Oct 30, 2016 at 11:58 AM, Spark User <sp...@gmail.com>
> wrote:
>
>> Hi All,
>>
>> I have a UDAF that seems to perform poorly when its input is skewed. I
>> have been debugging the UDAF implementation but I don't see any code that
>> is causing the performance to degrade. More details on the data and the
>> experiments I have run.
>>
>> DataSet: Assume 3 columns, column1 being the key.
>> Column1   Column2  Column3
>> a               1             x
>> a               2             x
>> a               3             x
>> a               4             x
>> a               5             x
>> a               6             z
>> 5 million row for a
>> ....
>> a               1000000   y
>> b               9             y
>> b               9             y
>> b               10           y
>> 3 million rows for b
>> ...
>> more rows
>> total rows is 100 million
>>
>>
>> a has 5 million rows.Column2 for a has 1 million unique values.
>> b has 3 million rows. Column2 for b has 800000 unique values.
>>
>> Column 3 has just 100s of unique values not in the order of millions, for
>> both a and b.
>>
>> Say totally there are 100 million rows as the input to a UDAF
>> aggregation. And the skew in data is for the keys a and b. All other rows
>> can be ignored and do not cause any performance issue/ hot partitions.
>>
>> The code does a dataSet.groupBy("Column1").agg(udaf("Column2",
>> "Column3").
>>
>> I commented out the UDAF implementation for update and merge methods, so
>> essentially the UDAF was doing nothing.
>>
>> With this code (empty updated and merge for UDAF) the performance for a
>> mircro-batch is 16 minutes per micro-batch, micro-batch containing 100
>> million rows, with 5million rows for a and 1 million unique values for
>> Column2 for a.
>>
>> But when I pass empty values for Column2 with nothing else change,
>> effectively reducing the 1 million unique values for Column2 to just 1
>> unique value, empty value. The batch processing time goes down to 4 minutes.
>>
>> So I am trying to understand why is there such a big performance
>> difference? What in UDAF causes the processing time to increase in orders
>> of magnitude when there is a skew in the data as observed above?
>>
>> Any insight from spark developers, contributors, or anyone else who has a
>> deeper understanding of UDAF would be helpful.
>>
>> Thanks,
>> Bharath
>>
>>
>>
>

Re: Performance bug in UDAF?

Posted by Spark User <sp...@gmail.com>.
Trying again. Hoping to find some help in figuring out the performance
bottleneck we are observing.

Thanks,
Bharath

On Sun, Oct 30, 2016 at 11:58 AM, Spark User <sp...@gmail.com>
wrote:

> Hi All,
>
> I have a UDAF that seems to perform poorly when its input is skewed. I
> have been debugging the UDAF implementation but I don't see any code that
> is causing the performance to degrade. More details on the data and the
> experiments I have run.
>
> DataSet: Assume 3 columns, column1 being the key.
> Column1   Column2  Column3
> a               1             x
> a               2             x
> a               3             x
> a               4             x
> a               5             x
> a               6             z
> 5 million row for a
> ....
> a               1000000   y
> b               9             y
> b               9             y
> b               10           y
> 3 million rows for b
> ...
> more rows
> total rows is 100 million
>
>
> a has 5 million rows.Column2 for a has 1 million unique values.
> b has 3 million rows. Column2 for b has 800000 unique values.
>
> Column 3 has just 100s of unique values not in the order of millions, for
> both a and b.
>
> Say totally there are 100 million rows as the input to a UDAF aggregation.
> And the skew in data is for the keys a and b. All other rows can be ignored
> and do not cause any performance issue/ hot partitions.
>
> The code does a dataSet.groupBy("Column1").agg(udaf("Column2",
> "Column3").
>
> I commented out the UDAF implementation for update and merge methods, so
> essentially the UDAF was doing nothing.
>
> With this code (empty updated and merge for UDAF) the performance for a
> mircro-batch is 16 minutes per micro-batch, micro-batch containing 100
> million rows, with 5million rows for a and 1 million unique values for
> Column2 for a.
>
> But when I pass empty values for Column2 with nothing else change,
> effectively reducing the 1 million unique values for Column2 to just 1
> unique value, empty value. The batch processing time goes down to 4 minutes.
>
> So I am trying to understand why is there such a big performance
> difference? What in UDAF causes the processing time to increase in orders
> of magnitude when there is a skew in the data as observed above?
>
> Any insight from spark developers, contributors, or anyone else who has a
> deeper understanding of UDAF would be helpful.
>
> Thanks,
> Bharath
>
>
>