You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Li, Chengxiang" <ch...@intel.com> on 2015/10/15 12:24:59 UTC

A proposal about skew data handling in Flink

Dear all,
In many real world use case, data are nature to be skewed. For example, in social network, famous people get much more "follow" than others, a hot tweet would be transferred millions of times. and the purchased records of normal product can never compared to hot products. While at the same time, Flink runtime assume that all tasks consume same size resources, this's not always true. Skew data handling try to make skewed data fit into Flink's runtime.
I write a proposal about skew data handling in Flink, you can read it at https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIfeDYldvZsKI/edit?usp=sharing.
Any comments and feedback are welcome, you can comment on the google doc, or reply this email thread directly.

Thanks
Chengxiang

Re: A proposal about skew data handling in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Yes, that sounds good to me.
Implement support for generic range partitioning first and go for the
non-range-equally-splittable cases later.

Best, Fabian

2015-10-20 5:21 GMT+02:00 Li, Chengxiang <ch...@intel.com>:

> Thanks a lot for the comments, Fabian. I agree with you on the plan
> mostly, just add some more thoughts about  Non-Range-Equally-Splittable
> case here.
> 1. Let's assume a case which 10% data is skewed on certain key, in this
> case, as long as the parallelism is larger than 10, it would fit into
> Non-Range-Equally-Splittable case. So it should not be very corner case of
> skew issue.
> 2. In proposal, the solution of Non-Range-Equally-Splittable case is based
> on 2 new RangePartitioner and little optimizer logic, which has been
> touched already in the plan #1, #2. It does not require to change anything
> about the operator semantics, so if we have a good partitioner abstraction,
> I think it does not add much complexity for Flink to handle this kind of
> issue.
> It should not block anything, after finished the simple case, we would
> have more knowledge about the implementation details, then we can look back
> at this issue, and decide whether it's deserved to be resolved at the cost.
>
> Thanks
> Chengxiang
> -----Original Message-----
> From: Fabian Hueske [mailto:fhueske@gmail.com]
> Sent: Monday, October 19, 2015 7:15 PM
> To: dev@flink.apache.org
> Subject: Re: A proposal about skew data handling in Flink
>
> Hi,
>
> First of all, thanks a lot for this extensive proposal! It contains a lot
> of good observations and techniques how to address data skew.
>
> I have a few remarks:
>
> 1) The terms Input and Output Contract were introduced in the first
> scientific publications and are not used anymore. Input Contract are what
> we call operators or transformations today, the concept of output contract
> is completely gone.
> In the current code, we have operators like Map, Reduce, and Join that
> describe how data needs to be organized (by key, etc.) and UDFs that
> process the data.
>
> 2) I would categorize skew as follows:
>
> - UDF Call Complexity Skew: The input cardinalities of UDF calls differ
> (only applicable to group-based operators such as GroupReduce and CoGroup)
> or the computational complexity of UDF calls depends on the data and varies
> a lot. UDF calls are the smallest parallelizable unit. It is not possible
> to change that without changing the semantics. Combiners can help to reduce
> the effect of skew for group-based operators.
>
> - Input Partition Skew: The cardinality of parallel partitions varies.
> This is handled by Flink as follows:
>     - Lazy split assignment for data sources
>     - Operators that do not require special partitioning (Map, Filter,
> Cross, etc.) just consume the output partitions of the preceding operator.
> Rebalance() can be used to enforce round-robin partitioning to equalize
> size of all partitions.
>     - Operators that require key-based partitioning use hash partitioning.
> Range partitioning can help address significant data skew.
>
> - UDF Call Skew: The number of UDF calls per parallel partition varies.
> This can be an issue for n-m joins which essentially result in Cartesian
> products.
>     - UDF Call Skew is most relevant for Joins
>     - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by
> controlling Input Partition Skew
>
> 3) I agree that we should not try to detect and automatically fix data
> skew (at the moment) but give users tools to manually manage skew.
>
> 4) I would focus on addressing the Input Partition Skew problem. UDF Call
> Complexity Skew cannot be addressed because it would change the semantics
> of operators. UDF Call Skew is only affecting joins and much harder to
> solve.
>
> 5) I wonder how much the practical gain is to address the
> Non-Range-Equally-Splittable case compared to the added code complexity. In
> general, tackling skew is a very good idea, but solving corner cases with
> quite complex methods might make future features more complicated to add.
> Hence, I would propose to focus on the common and "easy" cases first.
>
> I would address Input Partition Skew first and ignore the
> Non-Range-Equally-Splittable case for now. We can do this in two steps:
>
> 1) Add the "simple" range partitioner as in your pull request for unary
> operators (explicit range partitioning, total order, groupBy). Once the
> sampling happens online, this is a very good addition to Flink.
> 2) Add the "simple" range partitioner also for binary operators (join,
> coGroup). This will be a bit more tricky, because we need to do a
> coordinated decision for both inputs.
> 3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe
> through optimizer hints.
>
> Since we want to have this transparently handled by the API and engine, we
> need to add a lot of these features into the optimizer, or
> JobGraphGenerator to be more precisely.
>
> Does that make sense to you?
>
> Cheers, Fabian
>
> 2015-10-16 17:13 GMT+02:00 Fabian Hueske <fh...@gmail.com>:
>
> > Hi,
> >
> > thanks for starting a discussion about data skew! I agree, it's a
> > important issue that can cause a lot of problems.
> > I'll have a look at your proposal and add comments soon.
> >
> > Thanks, Fabian
> >
> > 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <ch...@intel.com>:
> >
> >> Dear all,
> >> In many real world use case, data are nature to be skewed. For
> >> example, in social network, famous people get much more "follow" than
> >> others, a hot tweet would be transferred millions of times. and the
> >> purchased records of normal product can never compared to hot
> >> products. While at the same time, Flink runtime assume that all tasks
> >> consume same size resources, this's not always true. Skew data
> >> handling try to make skewed data fit into Flink's runtime.
> >> I write a proposal about skew data handling in Flink, you can read it
> >> at
> >> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIf
> >> eDYldvZsKI/edit?usp=sharing
> >> .
> >> Any comments and feedback are welcome, you can comment on the google
> >> doc, or reply this email thread directly.
> >>
> >> Thanks
> >> Chengxiang
> >>
> >
> >
>

RE: A proposal about skew data handling in Flink

Posted by "Li, Chengxiang" <ch...@intel.com>.
Thanks a lot for the comments, Fabian. I agree with you on the plan mostly, just add some more thoughts about  Non-Range-Equally-Splittable case here.
1. Let's assume a case which 10% data is skewed on certain key, in this case, as long as the parallelism is larger than 10, it would fit into Non-Range-Equally-Splittable case. So it should not be very corner case of skew issue.
2. In proposal, the solution of Non-Range-Equally-Splittable case is based on 2 new RangePartitioner and little optimizer logic, which has been touched already in the plan #1, #2. It does not require to change anything about the operator semantics, so if we have a good partitioner abstraction, I think it does not add much complexity for Flink to handle this kind of issue. 
It should not block anything, after finished the simple case, we would have more knowledge about the implementation details, then we can look back at this issue, and decide whether it's deserved to be resolved at the cost.

Thanks
Chengxiang 
-----Original Message-----
From: Fabian Hueske [mailto:fhueske@gmail.com] 
Sent: Monday, October 19, 2015 7:15 PM
To: dev@flink.apache.org
Subject: Re: A proposal about skew data handling in Flink

Hi,

First of all, thanks a lot for this extensive proposal! It contains a lot of good observations and techniques how to address data skew.

I have a few remarks:

1) The terms Input and Output Contract were introduced in the first scientific publications and are not used anymore. Input Contract are what we call operators or transformations today, the concept of output contract is completely gone.
In the current code, we have operators like Map, Reduce, and Join that describe how data needs to be organized (by key, etc.) and UDFs that process the data.

2) I would categorize skew as follows:

- UDF Call Complexity Skew: The input cardinalities of UDF calls differ (only applicable to group-based operators such as GroupReduce and CoGroup) or the computational complexity of UDF calls depends on the data and varies a lot. UDF calls are the smallest parallelizable unit. It is not possible to change that without changing the semantics. Combiners can help to reduce the effect of skew for group-based operators.

- Input Partition Skew: The cardinality of parallel partitions varies. This is handled by Flink as follows:
    - Lazy split assignment for data sources
    - Operators that do not require special partitioning (Map, Filter, Cross, etc.) just consume the output partitions of the preceding operator.
Rebalance() can be used to enforce round-robin partitioning to equalize size of all partitions.
    - Operators that require key-based partitioning use hash partitioning.
Range partitioning can help address significant data skew.

- UDF Call Skew: The number of UDF calls per parallel partition varies.
This can be an issue for n-m joins which essentially result in Cartesian products.
    - UDF Call Skew is most relevant for Joins
    - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by controlling Input Partition Skew

3) I agree that we should not try to detect and automatically fix data skew (at the moment) but give users tools to manually manage skew.

4) I would focus on addressing the Input Partition Skew problem. UDF Call Complexity Skew cannot be addressed because it would change the semantics of operators. UDF Call Skew is only affecting joins and much harder to solve.

5) I wonder how much the practical gain is to address the Non-Range-Equally-Splittable case compared to the added code complexity. In general, tackling skew is a very good idea, but solving corner cases with quite complex methods might make future features more complicated to add.
Hence, I would propose to focus on the common and "easy" cases first.

I would address Input Partition Skew first and ignore the Non-Range-Equally-Splittable case for now. We can do this in two steps:

1) Add the "simple" range partitioner as in your pull request for unary operators (explicit range partitioning, total order, groupBy). Once the sampling happens online, this is a very good addition to Flink.
2) Add the "simple" range partitioner also for binary operators (join, coGroup). This will be a bit more tricky, because we need to do a coordinated decision for both inputs.
3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe through optimizer hints.

Since we want to have this transparently handled by the API and engine, we need to add a lot of these features into the optimizer, or JobGraphGenerator to be more precisely.

Does that make sense to you?

Cheers, Fabian

2015-10-16 17:13 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi,
>
> thanks for starting a discussion about data skew! I agree, it's a 
> important issue that can cause a lot of problems.
> I'll have a look at your proposal and add comments soon.
>
> Thanks, Fabian
>
> 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <ch...@intel.com>:
>
>> Dear all,
>> In many real world use case, data are nature to be skewed. For 
>> example, in social network, famous people get much more "follow" than 
>> others, a hot tweet would be transferred millions of times. and the 
>> purchased records of normal product can never compared to hot 
>> products. While at the same time, Flink runtime assume that all tasks 
>> consume same size resources, this's not always true. Skew data 
>> handling try to make skewed data fit into Flink's runtime.
>> I write a proposal about skew data handling in Flink, you can read it 
>> at 
>> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIf
>> eDYldvZsKI/edit?usp=sharing
>> .
>> Any comments and feedback are welcome, you can comment on the google 
>> doc, or reply this email thread directly.
>>
>> Thanks
>> Chengxiang
>>
>
>

Re: A proposal about skew data handling in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

First of all, thanks a lot for this extensive proposal! It contains a lot
of good observations and techniques how to address data skew.

I have a few remarks:

1) The terms Input and Output Contract were introduced in the first
scientific publications and are not used anymore. Input Contract are what
we call operators or transformations today, the concept of output contract
is completely gone.
In the current code, we have operators like Map, Reduce, and Join that
describe how data needs to be organized (by key, etc.) and UDFs that
process the data.

2) I would categorize skew as follows:

- UDF Call Complexity Skew: The input cardinalities of UDF calls differ
(only applicable to group-based operators such as GroupReduce and CoGroup)
or the computational complexity of UDF calls depends on the data and varies
a lot. UDF calls are the smallest parallelizable unit. It is not possible
to change that without changing the semantics. Combiners can help to reduce
the effect of skew for group-based operators.

- Input Partition Skew: The cardinality of parallel partitions varies. This
is handled by Flink as follows:
    - Lazy split assignment for data sources
    - Operators that do not require special partitioning (Map, Filter,
Cross, etc.) just consume the output partitions of the preceding operator.
Rebalance() can be used to enforce round-robin partitioning to equalize
size of all partitions.
    - Operators that require key-based partitioning use hash partitioning.
Range partitioning can help address significant data skew.

- UDF Call Skew: The number of UDF calls per parallel partition varies.
This can be an issue for n-m joins which essentially result in Cartesian
products.
    - UDF Call Skew is most relevant for Joins
    - UDF Call Skew for Map, Reduce, CoGroup, Cross can be controlled by
controlling Input Partition Skew

3) I agree that we should not try to detect and automatically fix data skew
(at the moment) but give users tools to manually manage skew.

4) I would focus on addressing the Input Partition Skew problem. UDF Call
Complexity Skew cannot be addressed because it would change the semantics
of operators. UDF Call Skew is only affecting joins and much harder to
solve.

5) I wonder how much the practical gain is to address the
Non-Range-Equally-Splittable case compared to the added code complexity. In
general, tackling skew is a very good idea, but solving corner cases with
quite complex methods might make future features more complicated to add.
Hence, I would propose to focus on the common and "easy" cases first.

I would address Input Partition Skew first and ignore the
Non-Range-Equally-Splittable case for now. We can do this in two steps:

1) Add the "simple" range partitioner as in your pull request for unary
operators (explicit range partitioning, total order, groupBy). Once the
sampling happens online, this is a very good addition to Flink.
2) Add the "simple" range partitioner also for binary operators (join,
coGroup). This will be a bit more tricky, because we need to do a
coordinated decision for both inputs.
3) Expose range partitioning for GroupBy, Join, CoGroup to the API, maybe
through optimizer hints.

Since we want to have this transparently handled by the API and engine, we
need to add a lot of these features into the optimizer, or
JobGraphGenerator to be more precisely.

Does that make sense to you?

Cheers, Fabian

2015-10-16 17:13 GMT+02:00 Fabian Hueske <fh...@gmail.com>:

> Hi,
>
> thanks for starting a discussion about data skew! I agree, it's a
> important issue that can cause a lot of problems.
> I'll have a look at your proposal and add comments soon.
>
> Thanks, Fabian
>
> 2015-10-15 12:24 GMT+02:00 Li, Chengxiang <ch...@intel.com>:
>
>> Dear all,
>> In many real world use case, data are nature to be skewed. For example,
>> in social network, famous people get much more "follow" than others, a hot
>> tweet would be transferred millions of times. and the purchased records of
>> normal product can never compared to hot products. While at the same time,
>> Flink runtime assume that all tasks consume same size resources, this's not
>> always true. Skew data handling try to make skewed data fit into Flink's
>> runtime.
>> I write a proposal about skew data handling in Flink, you can read it at
>> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIfeDYldvZsKI/edit?usp=sharing
>> .
>> Any comments and feedback are welcome, you can comment on the google doc,
>> or reply this email thread directly.
>>
>> Thanks
>> Chengxiang
>>
>
>

Re: A proposal about skew data handling in Flink

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

thanks for starting a discussion about data skew! I agree, it's a important
issue that can cause a lot of problems.
I'll have a look at your proposal and add comments soon.

Thanks, Fabian

2015-10-15 12:24 GMT+02:00 Li, Chengxiang <ch...@intel.com>:

> Dear all,
> In many real world use case, data are nature to be skewed. For example, in
> social network, famous people get much more "follow" than others, a hot
> tweet would be transferred millions of times. and the purchased records of
> normal product can never compared to hot products. While at the same time,
> Flink runtime assume that all tasks consume same size resources, this's not
> always true. Skew data handling try to make skewed data fit into Flink's
> runtime.
> I write a proposal about skew data handling in Flink, you can read it at
> https://docs.google.com/document/d/1ma060BUlhXDqeFmviEO7Io4CXLKgrAXIfeDYldvZsKI/edit?usp=sharing
> .
> Any comments and feedback are welcome, you can comment on the google doc,
> or reply this email thread directly.
>
> Thanks
> Chengxiang
>