You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Rong Rong <wa...@gmail.com> on 2018/10/01 15:48:23 UTC

Re: Handling burst I/O when using tumbling/sliding windows

Hi Piotrek,

Thanks for the quick response. To follow up with the questions:
Re 1). Yes it is causing network I/O issues on Kafka itself.

Re 2a). Actually. I thought about it last weekend and I think there's a way
for a work around: We directly duplicated the key extraction logic in our
window assigner. Since the element record is passed in, it should be OK to
create a customized window assigner to handle offset-based on key by
extracting the key from record
This was the main part of my change: to let WindowAssignerContext to
provide current key information extracted from KeyedStateBackend.

Re 2b). Thanks for the explanation, we will try to profile it! We've seems
some weird behaviors previously when loading up the network buffer in
Flink, although it's very rare and inconsistent when trying to reproduce.

Re 3) Regarding the event time offset. I think I might have not explain my
idea clearly. I added some more details to the doc. Please kindly take a
look.
In a nutshell, window offsets does not change the event time of records at
all. We simply changes how window assigner assigns records to windows with
various different offsets.

--
Rong

On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Thanks for the response again :)
>
> Re 1). Do you mean that this extra burst external I/O network traffic is
> causing disturbance with other systems reading/writing from Kafka? With
> Kafka itself?
>
> Re 2a) Yes, it should be relatively simple, however any new brick makes
> the overall component more and more complicated, which has long term
> consequences in maintenance/refactoring/adding new features/just making
> reading the code more difficult etc.
>
> Re 2b) With setup of:
>
> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>
> RateLimitingOperator would just slow down data processing via standard
> back pressure mechanism. Flink by default allocates 10% of the memory to
> Network buffers we could partially relay on them to buffer some smaller
> bursts, without blocking whole pipeline altogether. Essentially
> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
> record emission from the WindowOperator. So yes, there would be still batch
> emission of the data in the WindowOperator itself, but it would be
> prolonged/slowed down in terms of wall time because of down stream back
> pressure caused by RateLimitingOperator.
>
> Btw, with your proposal, with what event time do you want to emit the
> delayed data? If the event time of the produced records changes based on
> using/not using windows offsets, this can cause quite a lot of semantic
> problems and side effects for the downstream operators.
>
> Piotrek
>
> > On 28 Sep 2018, at 15:18, Rong Rong <wa...@gmail.com> wrote:
> >
> > Hi Piotrek,
> >
> > Thanks for getting back to me so quickly. Let me explain.
> >
> > Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
> > system with same partition number on both side. It is causing degraded
> > performance in external I/O network traffic.
> > It is definitely possible to configure more resource (e.g. larger
> partition
> > count) for output to handle the burst but it can also be resolved through
> > some sort of smoothing through internal (either through rate limiting as
> > you suggested, or through the dynamic offset).
> >
> > Re 2a). Yes I agree and I think I understand your concern. However it is
> > one simple API addition with default fallbacks that are fully
> > backward-compatible (or I think it be made fully compatible if I missed
> and
> > corner cases).
> > Re 2b). Yes. there could be many potential issues that causes data burst.
> > However, putting aside the scenarios that was caused by the nature of the
> > stream (data skew, bursts) that both affects input and output. We want to
> > address specifically the case that a smooth input is *deterministically*
> > resulting in burst output. What we are proposing here is kind of exactly
> > like the case of users' customer operator. However we can't do so unless
> > there's an API to control the offset.
> >
> > Regarding the problem of rate limiting and skew. I think I missed one key
> > point from you. I think you are right. If we introduce a *new rate
> limiting
> > operator *(with size > 0) it will
> >  - causes extra state usage within the container (moving all the
> > components from window operator and store in rate limit buffer at window
> > boundaries).
> >  - will not cause data skew problem: The data skew problem I mentioned is
> > that, if data are buffered in window operator state longer for some data
> > but not the other. Then potentially some panes will handle more late
> > arrival than others.
> >
> > However if it is possible to get rid of the extra memory usage we will
> > definitely benchmark the rate-limit approach. Can you be more specific on
> > how setting the rate-limit operator (size = 0) can resolve the burst
> issue?
> > If I understand correctly the backpressure will cause the watermark to
> not
> > advance, but once it crosses the window boundary, there will still be a
> > batch of messages emitting out of the window operator at the same time,
> > correct?
> >
> > Thanks,
> > Rong
> >
> >
> >
> > On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com>
> > wrote:
> >
> >> Hi,
> >>
> >> Re 1. Can you be more specific? What system are you using, what’s
> >> happening and how does it brake?
> >>
> >> While delaying windows firing is probably the most cost effective
> solution
> >> for this particular problem, it has some disadvantages:
> >> a) putting even more logic to already complicated component
> >> b) not solving potential similar problems. I can easily imagine the same
> >> issue happening to other scenarios then "interval based operators” such
> as:
> >>        - input sources faster then output sinks
> >>        - data skew
> >>        - data bursts
> >>        - users' custom operators causing data bursts
> >>        - users’ custom operators being prone to bursts (maybe something
> >> like AsyncOperator or something else that works with an external
> system) -
> >> so the problem might not necessarily be limited to the sinks
> >>
> >> As far as I recall, there were some users reporting some similar issues.
> >>
> >> Regarding potential drawbacks of rate limiting, I didn’t understand this
> >> part:
> >>
> >>> However the problem is similar to delay triggers which can provide
> >> degraded performance for skew sensitive downstream service, such as
> feeding
> >> feature extraction results to deep learning model.
> >>
> >>
> >> The way how I could imagine RateLimitingOperator is that it could take a
> >> parameters: rate limits, buffer size limit.
> >>
> >> With buffer size = 0, it would cause immediately a back pressure if rate
> >> is exceeded
> >> With buffer size > 0, ti would first buffer events on the state and only
> >> when reaching max buffer size, causing the back pressure
> >>
> >> For the case with WindowOperator, if windows are evicted and removed
> from
> >> the state, using buffer size > 0, wouldn’t cause increased state usage,
> it
> >> would only move the state from the WindowOperator to the
> >> RateLimitingOperator.
> >>
> >> Piotrek
> >>
> >>> On 27 Sep 2018, at 17:28, Rong Rong <wa...@gmail.com> wrote:
> >>>
> >>> HI Piotrek,
> >>>
> >>> Yes, to be more clear,
> >>> 1) the network I/O issue I am referring to is in between Flink and
> >> external
> >>> sink. We did not see issues in between operators.
> >>> 2) yes we've considered rate limiting sink functions as well which is
> >> also
> >>> mentioned in the doc. along with some of the the pro-con we identified.
> >>>
> >>> This kind of problem seems to only occur in WindowOperator so far, but
> >> yes
> >>> it can probably occur to any aligned interval based operator.
> >>>
> >>> --
> >>> Rong
> >>>
> >>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
> piotr@data-artisans.com
> >>>
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> Thanks for the proposal. Could you provide more
> >>>> background/explanation/motivation why do you need such feature? What
> do
> >> you
> >>>> mean by “network I/O” degradation?
> >>>>
> >>>> On it’s own burst writes shouldn’t cause problems within Flink. If
> they
> >>>> do, we might want to fix the original underlying problem and if they
> are
> >>>> causing problems in external systems, we also might think about other
> >>>> approaches to fix/handle the problem (write rate limiting?), which
> >> might be
> >>>> more general and not fixing only bursts originating from
> WindowOperator.
> >>>> I’m not saying that your proposal is bad or anything, but I would just
> >> like
> >>>> to have more context :)
> >>>>
> >>>> Piotrek.
> >>>>
> >>>>> On 26 Sep 2018, at 19:21, Rong Rong <wa...@gmail.com> wrote:
> >>>>>
> >>>>> Hi Dev,
> >>>>>
> >>>>> I was wondering if there's any previous discussion regarding how to
> >>>> handle
> >>>>> burst network I/O when deploying Flink applications with window
> >>>> operators.
> >>>>>
> >>>>> We've recently see some significant network I/O degradation when
> trying
> >>>> to
> >>>>> use sliding window to perform rolling aggregations. The pattern is
> very
> >>>>> periodic: output connections get no traffic for a period of time
> until
> >> a
> >>>>> burst at window boundaries (in our case every 5 minutes).
> >>>>>
> >>>>> We have drafted a doc
> >>>>> <
> >>>>
> >>
> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing
> >>>>>
> >>>>> on
> >>>>> how we proposed to handle it to smooth the output traffic spikes.
> >> Please
> >>>>> kindly take a look, any comments and suggestions are highly
> >> appreciated.
> >>>>>
> >>>>> --
> >>>>> Rong
> >>>>
> >>>>
> >>
> >>
>
>

Re: Handling burst I/O when using tumbling/sliding windows

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Just to sum up this thread. Discussion in the before mentioned design doc concluded that there is no need for API changes. The semantic that Rong was asking for can be achieved by implementing a custom WindowAssigner, that mimics one of the existing ones like TumblingEventTimeWindows, and using some kind of KeySelector to deterministically offset the assigned windows based on extracted key. This KeySelector might (but not necessarily must) be the KeySelector used to keyed the stream before WindowOperator. Important thing is that KeySelectors are obliged to deterministically return the same result over and over again on the same elements, without state access. 

Thanks Rong for driving the discussion :)

Piotrek

> On 10 Oct 2018, at 19:49, Rong Rong <wa...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thanks for the feedback and reviews.
> Yes, as I explained previously in reply to the (2B) point. I think it is possible to create our own customized window assigner without any API change if we eliminate the requirement of 
> "the same key should always results in the same offset"
> I have updated the document to reflect such point. Probably you could give some more insights regarding this particular question. 
> 
> Much appreciated the feedback and efforts :-)
> 
> --
> Rong
> 
> 
> On Tue, Oct 9, 2018 at 12:15 AM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi,
> 
> Sorry for getting back so late and thanks for the improved document :) I think now I got your idea.
> 
> You are now trying (or have you already done it?) to implement a custom window assigner, that would work as in the [Figure 3] from your document? 
> 
> 
> I think that indeed should be possible and relatively easy to do without the need for API changes.
> 
> Piotrek
> 
>> On 1 Oct 2018, at 17:48, Rong Rong <walterddr@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Piotrek,
>> 
>> Thanks for the quick response. To follow up with the questions:
>> Re 1). Yes it is causing network I/O issues on Kafka itself.
>> 
>> Re 2a). Actually. I thought about it last weekend and I think there's a way
>> for a work around: We directly duplicated the key extraction logic in our
>> window assigner. Since the element record is passed in, it should be OK to
>> create a customized window assigner to handle offset-based on key by
>> extracting the key from record
>> This was the main part of my change: to let WindowAssignerContext to
>> provide current key information extracted from KeyedStateBackend.
>> 
>> Re 2b). Thanks for the explanation, we will try to profile it! We've seems
>> some weird behaviors previously when loading up the network buffer in
>> Flink, although it's very rare and inconsistent when trying to reproduce.
>> 
>> Re 3) Regarding the event time offset. I think I might have not explain my
>> idea clearly. I added some more details to the doc. Please kindly take a
>> look.
>> In a nutshell, window offsets does not change the event time of records at
>> all. We simply changes how window assigner assigns records to windows with
>> various different offsets.
>> 
>> --
>> Rong
>> 
>> On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
>> wrote:
>> 
>>> Hi,
>>> 
>>> Thanks for the response again :)
>>> 
>>> Re 1). Do you mean that this extra burst external I/O network traffic is
>>> causing disturbance with other systems reading/writing from Kafka? With
>>> Kafka itself?
>>> 
>>> Re 2a) Yes, it should be relatively simple, however any new brick makes
>>> the overall component more and more complicated, which has long term
>>> consequences in maintenance/refactoring/adding new features/just making
>>> reading the code more difficult etc.
>>> 
>>> Re 2b) With setup of:
>>> 
>>> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>>> 
>>> RateLimitingOperator would just slow down data processing via standard
>>> back pressure mechanism. Flink by default allocates 10% of the memory to
>>> Network buffers we could partially relay on them to buffer some smaller
>>> bursts, without blocking whole pipeline altogether. Essentially
>>> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
>>> record emission from the WindowOperator. So yes, there would be still batch
>>> emission of the data in the WindowOperator itself, but it would be
>>> prolonged/slowed down in terms of wall time because of down stream back
>>> pressure caused by RateLimitingOperator.
>>> 
>>> Btw, with your proposal, with what event time do you want to emit the
>>> delayed data? If the event time of the produced records changes based on
>>> using/not using windows offsets, this can cause quite a lot of semantic
>>> problems and side effects for the downstream operators.
>>> 
>>> Piotrek
>>> 
>>>> On 28 Sep 2018, at 15:18, Rong Rong <walterddr@gmail.com <ma...@gmail.com>> wrote:
>>>> 
>>>> Hi Piotrek,
>>>> 
>>>> Thanks for getting back to me so quickly. Let me explain.
>>>> 
>>>> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
>>>> system with same partition number on both side. It is causing degraded
>>>> performance in external I/O network traffic.
>>>> It is definitely possible to configure more resource (e.g. larger
>>> partition
>>>> count) for output to handle the burst but it can also be resolved through
>>>> some sort of smoothing through internal (either through rate limiting as
>>>> you suggested, or through the dynamic offset).
>>>> 
>>>> Re 2a). Yes I agree and I think I understand your concern. However it is
>>>> one simple API addition with default fallbacks that are fully
>>>> backward-compatible (or I think it be made fully compatible if I missed
>>> and
>>>> corner cases).
>>>> Re 2b). Yes. there could be many potential issues that causes data burst.
>>>> However, putting aside the scenarios that was caused by the nature of the
>>>> stream (data skew, bursts) that both affects input and output. We want to
>>>> address specifically the case that a smooth input is *deterministically*
>>>> resulting in burst output. What we are proposing here is kind of exactly
>>>> like the case of users' customer operator. However we can't do so unless
>>>> there's an API to control the offset.
>>>> 
>>>> Regarding the problem of rate limiting and skew. I think I missed one key
>>>> point from you. I think you are right. If we introduce a *new rate
>>> limiting
>>>> operator *(with size > 0) it will
>>>> - causes extra state usage within the container (moving all the
>>>> components from window operator and store in rate limit buffer at window
>>>> boundaries).
>>>> - will not cause data skew problem: The data skew problem I mentioned is
>>>> that, if data are buffered in window operator state longer for some data
>>>> but not the other. Then potentially some panes will handle more late
>>>> arrival than others.
>>>> 
>>>> However if it is possible to get rid of the extra memory usage we will
>>>> definitely benchmark the rate-limit approach. Can you be more specific on
>>>> how setting the rate-limit operator (size = 0) can resolve the burst
>>> issue?
>>>> If I understand correctly the backpressure will cause the watermark to
>>> not
>>>> advance, but once it crosses the window boundary, there will still be a
>>>> batch of messages emitting out of the window operator at the same time,
>>>> correct?
>>>> 
>>>> Thanks,
>>>> Rong
>>>> 
>>>> 
>>>> 
>>>> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <piotr@data-artisans.com <ma...@data-artisans.com>>
>>>> wrote:
>>>> 
>>>>> Hi,
>>>>> 
>>>>> Re 1. Can you be more specific? What system are you using, what’s
>>>>> happening and how does it brake?
>>>>> 
>>>>> While delaying windows firing is probably the most cost effective
>>> solution
>>>>> for this particular problem, it has some disadvantages:
>>>>> a) putting even more logic to already complicated component
>>>>> b) not solving potential similar problems. I can easily imagine the same
>>>>> issue happening to other scenarios then "interval based operators” such
>>> as:
>>>>>       - input sources faster then output sinks
>>>>>       - data skew
>>>>>       - data bursts
>>>>>       - users' custom operators causing data bursts
>>>>>       - users’ custom operators being prone to bursts (maybe something
>>>>> like AsyncOperator or something else that works with an external
>>> system) -
>>>>> so the problem might not necessarily be limited to the sinks
>>>>> 
>>>>> As far as I recall, there were some users reporting some similar issues.
>>>>> 
>>>>> Regarding potential drawbacks of rate limiting, I didn’t understand this
>>>>> part:
>>>>> 
>>>>>> However the problem is similar to delay triggers which can provide
>>>>> degraded performance for skew sensitive downstream service, such as
>>> feeding
>>>>> feature extraction results to deep learning model.
>>>>> 
>>>>> 
>>>>> The way how I could imagine RateLimitingOperator is that it could take a
>>>>> parameters: rate limits, buffer size limit.
>>>>> 
>>>>> With buffer size = 0, it would cause immediately a back pressure if rate
>>>>> is exceeded
>>>>> With buffer size > 0, ti would first buffer events on the state and only
>>>>> when reaching max buffer size, causing the back pressure
>>>>> 
>>>>> For the case with WindowOperator, if windows are evicted and removed
>>> from
>>>>> the state, using buffer size > 0, wouldn’t cause increased state usage,
>>> it
>>>>> would only move the state from the WindowOperator to the
>>>>> RateLimitingOperator.
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>>> On 27 Sep 2018, at 17:28, Rong Rong <walterddr@gmail.com <ma...@gmail.com>> wrote:
>>>>>> 
>>>>>> HI Piotrek,
>>>>>> 
>>>>>> Yes, to be more clear,
>>>>>> 1) the network I/O issue I am referring to is in between Flink and
>>>>> external
>>>>>> sink. We did not see issues in between operators.
>>>>>> 2) yes we've considered rate limiting sink functions as well which is
>>>>> also
>>>>>> mentioned in the doc. along with some of the the pro-con we identified.
>>>>>> 
>>>>>> This kind of problem seems to only occur in WindowOperator so far, but
>>>>> yes
>>>>>> it can probably occur to any aligned interval based operator.
>>>>>> 
>>>>>> --
>>>>>> Rong
>>>>>> 
>>>>>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
>>> piotr@data-artisans.com <ma...@data-artisans.com>
>>>>>> 
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi,
>>>>>>> 
>>>>>>> Thanks for the proposal. Could you provide more
>>>>>>> background/explanation/motivation why do you need such feature? What
>>> do
>>>>> you
>>>>>>> mean by “network I/O” degradation?
>>>>>>> 
>>>>>>> On it’s own burst writes shouldn’t cause problems within Flink. If
>>> they
>>>>>>> do, we might want to fix the original underlying problem and if they
>>> are
>>>>>>> causing problems in external systems, we also might think about other
>>>>>>> approaches to fix/handle the problem (write rate limiting?), which
>>>>> might be
>>>>>>> more general and not fixing only bursts originating from
>>> WindowOperator.
>>>>>>> I’m not saying that your proposal is bad or anything, but I would just
>>>>> like
>>>>>>> to have more context :)
>>>>>>> 
>>>>>>> Piotrek.
>>>>>>> 
>>>>>>>> On 26 Sep 2018, at 19:21, Rong Rong <walterddr@gmail.com <ma...@gmail.com>> wrote:
>>>>>>>> 
>>>>>>>> Hi Dev,
>>>>>>>> 
>>>>>>>> I was wondering if there's any previous discussion regarding how to
>>>>>>> handle
>>>>>>>> burst network I/O when deploying Flink applications with window
>>>>>>> operators.
>>>>>>>> 
>>>>>>>> We've recently see some significant network I/O degradation when
>>> trying
>>>>>>> to
>>>>>>>> use sliding window to perform rolling aggregations. The pattern is
>>> very
>>>>>>>> periodic: output connections get no traffic for a period of time
>>> until
>>>>> a
>>>>>>>> burst at window boundaries (in our case every 5 minutes).
>>>>>>>> 
>>>>>>>> We have drafted a doc
>>>>>>>> <
>>>>>>> 
>>>>> 
>>> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing <https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing>
>>>>>>>> 
>>>>>>>> on
>>>>>>>> how we proposed to handle it to smooth the output traffic spikes.
>>>>> Please
>>>>>>>> kindly take a look, any comments and suggestions are highly
>>>>> appreciated.
>>>>>>>> 
>>>>>>>> --
>>>>>>>> Rong
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 
> <figure3.png>


Re: Handling burst I/O when using tumbling/sliding windows

Posted by Rong Rong <wa...@gmail.com>.
Hi Piotrek,

Thanks for the feedback and reviews.
Yes, as I explained previously in reply to the (2B) point. I think it is
possible to create our own customized window assigner without any API
change if we eliminate the requirement of

*"the same key should always results in the same offset"*

I have updated the document to reflect such point. Probably you could give
some more insights regarding this particular question.

Much appreciated the feedback and efforts :-)

--
Rong


On Tue, Oct 9, 2018 at 12:15 AM Piotr Nowojski <pi...@data-artisans.com>
wrote:

> Hi,
>
> Sorry for getting back so late and thanks for the improved document :) I
> think now I got your idea.
>
> You are now trying (or have you already done it?) to implement a custom
> window assigner, that would work as in the [Figure 3] from your document?
>
> I think that indeed should be possible and relatively easy to do without
> the need for API changes.
>
> Piotrek
>
> On 1 Oct 2018, at 17:48, Rong Rong <wa...@gmail.com> wrote:
>
> Hi Piotrek,
>
> Thanks for the quick response. To follow up with the questions:
> Re 1). Yes it is causing network I/O issues on Kafka itself.
>
> Re 2a). Actually. I thought about it last weekend and I think there's a way
> for a work around: We directly duplicated the key extraction logic in our
> window assigner. Since the element record is passed in, it should be OK to
> create a customized window assigner to handle offset-based on key by
> extracting the key from record
> This was the main part of my change: to let WindowAssignerContext to
> provide current key information extracted from KeyedStateBackend.
>
> Re 2b). Thanks for the explanation, we will try to profile it! We've seems
> some weird behaviors previously when loading up the network buffer in
> Flink, although it's very rare and inconsistent when trying to reproduce.
>
> Re 3) Regarding the event time offset. I think I might have not explain my
> idea clearly. I added some more details to the doc. Please kindly take a
> look.
> In a nutshell, window offsets does not change the event time of records at
> all. We simply changes how window assigner assigns records to windows with
> various different offsets.
>
> --
> Rong
>
> On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
> Hi,
>
> Thanks for the response again :)
>
> Re 1). Do you mean that this extra burst external I/O network traffic is
> causing disturbance with other systems reading/writing from Kafka? With
> Kafka itself?
>
> Re 2a) Yes, it should be relatively simple, however any new brick makes
> the overall component more and more complicated, which has long term
> consequences in maintenance/refactoring/adding new features/just making
> reading the code more difficult etc.
>
> Re 2b) With setup of:
>
> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>
> RateLimitingOperator would just slow down data processing via standard
> back pressure mechanism. Flink by default allocates 10% of the memory to
> Network buffers we could partially relay on them to buffer some smaller
> bursts, without blocking whole pipeline altogether. Essentially
> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
> record emission from the WindowOperator. So yes, there would be still batch
> emission of the data in the WindowOperator itself, but it would be
> prolonged/slowed down in terms of wall time because of down stream back
> pressure caused by RateLimitingOperator.
>
> Btw, with your proposal, with what event time do you want to emit the
> delayed data? If the event time of the produced records changes based on
> using/not using windows offsets, this can cause quite a lot of semantic
> problems and side effects for the downstream operators.
>
> Piotrek
>
> On 28 Sep 2018, at 15:18, Rong Rong <wa...@gmail.com> wrote:
>
> Hi Piotrek,
>
> Thanks for getting back to me so quickly. Let me explain.
>
> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
> system with same partition number on both side. It is causing degraded
> performance in external I/O network traffic.
> It is definitely possible to configure more resource (e.g. larger
>
> partition
>
> count) for output to handle the burst but it can also be resolved through
> some sort of smoothing through internal (either through rate limiting as
> you suggested, or through the dynamic offset).
>
> Re 2a). Yes I agree and I think I understand your concern. However it is
> one simple API addition with default fallbacks that are fully
> backward-compatible (or I think it be made fully compatible if I missed
>
> and
>
> corner cases).
> Re 2b). Yes. there could be many potential issues that causes data burst.
> However, putting aside the scenarios that was caused by the nature of the
> stream (data skew, bursts) that both affects input and output. We want to
> address specifically the case that a smooth input is *deterministically*
> resulting in burst output. What we are proposing here is kind of exactly
> like the case of users' customer operator. However we can't do so unless
> there's an API to control the offset.
>
> Regarding the problem of rate limiting and skew. I think I missed one key
> point from you. I think you are right. If we introduce a *new rate
>
> limiting
>
> operator *(with size > 0) it will
> - causes extra state usage within the container (moving all the
> components from window operator and store in rate limit buffer at window
> boundaries).
> - will not cause data skew problem: The data skew problem I mentioned is
> that, if data are buffered in window operator state longer for some data
> but not the other. Then potentially some panes will handle more late
> arrival than others.
>
> However if it is possible to get rid of the extra memory usage we will
> definitely benchmark the rate-limit approach. Can you be more specific on
> how setting the rate-limit operator (size = 0) can resolve the burst
>
> issue?
>
> If I understand correctly the backpressure will cause the watermark to
>
> not
>
> advance, but once it crosses the window boundary, there will still be a
> batch of messages emitting out of the window operator at the same time,
> correct?
>
> Thanks,
> Rong
>
>
>
> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
>
> Hi,
>
> Re 1. Can you be more specific? What system are you using, what’s
> happening and how does it brake?
>
> While delaying windows firing is probably the most cost effective
>
> solution
>
> for this particular problem, it has some disadvantages:
> a) putting even more logic to already complicated component
> b) not solving potential similar problems. I can easily imagine the same
> issue happening to other scenarios then "interval based operators” such
>
> as:
>
>       - input sources faster then output sinks
>       - data skew
>       - data bursts
>       - users' custom operators causing data bursts
>       - users’ custom operators being prone to bursts (maybe something
> like AsyncOperator or something else that works with an external
>
> system) -
>
> so the problem might not necessarily be limited to the sinks
>
> As far as I recall, there were some users reporting some similar issues.
>
> Regarding potential drawbacks of rate limiting, I didn’t understand this
> part:
>
> However the problem is similar to delay triggers which can provide
>
> degraded performance for skew sensitive downstream service, such as
>
> feeding
>
> feature extraction results to deep learning model.
>
>
> The way how I could imagine RateLimitingOperator is that it could take a
> parameters: rate limits, buffer size limit.
>
> With buffer size = 0, it would cause immediately a back pressure if rate
> is exceeded
> With buffer size > 0, ti would first buffer events on the state and only
> when reaching max buffer size, causing the back pressure
>
> For the case with WindowOperator, if windows are evicted and removed
>
> from
>
> the state, using buffer size > 0, wouldn’t cause increased state usage,
>
> it
>
> would only move the state from the WindowOperator to the
> RateLimitingOperator.
>
> Piotrek
>
> On 27 Sep 2018, at 17:28, Rong Rong <wa...@gmail.com> wrote:
>
> HI Piotrek,
>
> Yes, to be more clear,
> 1) the network I/O issue I am referring to is in between Flink and
>
> external
>
> sink. We did not see issues in between operators.
> 2) yes we've considered rate limiting sink functions as well which is
>
> also
>
> mentioned in the doc. along with some of the the pro-con we identified.
>
> This kind of problem seems to only occur in WindowOperator so far, but
>
> yes
>
> it can probably occur to any aligned interval based operator.
>
> --
> Rong
>
> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
>
> piotr@data-artisans.com
>
>
> wrote:
>
> Hi,
>
> Thanks for the proposal. Could you provide more
> background/explanation/motivation why do you need such feature? What
>
> do
>
> you
>
> mean by “network I/O” degradation?
>
> On it’s own burst writes shouldn’t cause problems within Flink. If
>
> they
>
> do, we might want to fix the original underlying problem and if they
>
> are
>
> causing problems in external systems, we also might think about other
> approaches to fix/handle the problem (write rate limiting?), which
>
> might be
>
> more general and not fixing only bursts originating from
>
> WindowOperator.
>
> I’m not saying that your proposal is bad or anything, but I would just
>
> like
>
> to have more context :)
>
> Piotrek.
>
> On 26 Sep 2018, at 19:21, Rong Rong <wa...@gmail.com> wrote:
>
> Hi Dev,
>
> I was wondering if there's any previous discussion regarding how to
>
> handle
>
> burst network I/O when deploying Flink applications with window
>
> operators.
>
>
> We've recently see some significant network I/O degradation when
>
> trying
>
> to
>
> use sliding window to perform rolling aggregations. The pattern is
>
> very
>
> periodic: output connections get no traffic for a period of time
>
> until
>
> a
>
> burst at window boundaries (in our case every 5 minutes).
>
> We have drafted a doc
> <
>
>
>
>
> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing
>
>
> on
> how we proposed to handle it to smooth the output traffic spikes.
>
> Please
>
> kindly take a look, any comments and suggestions are highly
>
> appreciated.
>
>
> --
> Rong
>
>
>
>
>
>
>
>
>

Re: Handling burst I/O when using tumbling/sliding windows

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

Sorry for getting back so late and thanks for the improved document :) I think now I got your idea.

You are now trying (or have you already done it?) to implement a custom window assigner, that would work as in the [Figure 3] from your document? 


I think that indeed should be possible and relatively easy to do without the need for API changes.

Piotrek

> On 1 Oct 2018, at 17:48, Rong Rong <wa...@gmail.com> wrote:
> 
> Hi Piotrek,
> 
> Thanks for the quick response. To follow up with the questions:
> Re 1). Yes it is causing network I/O issues on Kafka itself.
> 
> Re 2a). Actually. I thought about it last weekend and I think there's a way
> for a work around: We directly duplicated the key extraction logic in our
> window assigner. Since the element record is passed in, it should be OK to
> create a customized window assigner to handle offset-based on key by
> extracting the key from record
> This was the main part of my change: to let WindowAssignerContext to
> provide current key information extracted from KeyedStateBackend.
> 
> Re 2b). Thanks for the explanation, we will try to profile it! We've seems
> some weird behaviors previously when loading up the network buffer in
> Flink, although it's very rare and inconsistent when trying to reproduce.
> 
> Re 3) Regarding the event time offset. I think I might have not explain my
> idea clearly. I added some more details to the doc. Please kindly take a
> look.
> In a nutshell, window offsets does not change the event time of records at
> all. We simply changes how window assigner assigns records to windows with
> various different offsets.
> 
> --
> Rong
> 
> On Fri, Sep 28, 2018 at 8:03 AM Piotr Nowojski <pi...@data-artisans.com>
> wrote:
> 
>> Hi,
>> 
>> Thanks for the response again :)
>> 
>> Re 1). Do you mean that this extra burst external I/O network traffic is
>> causing disturbance with other systems reading/writing from Kafka? With
>> Kafka itself?
>> 
>> Re 2a) Yes, it should be relatively simple, however any new brick makes
>> the overall component more and more complicated, which has long term
>> consequences in maintenance/refactoring/adding new features/just making
>> reading the code more difficult etc.
>> 
>> Re 2b) With setup of:
>> 
>> WindowOperator -> RateLimitingOperator(maxSize = 0) -> Sink
>> 
>> RateLimitingOperator would just slow down data processing via standard
>> back pressure mechanism. Flink by default allocates 10% of the memory to
>> Network buffers we could partially relay on them to buffer some smaller
>> bursts, without blocking whole pipeline altogether. Essentially
>> RateLimitingOperator(maxSize = 0) would cause back pressure and slow down
>> record emission from the WindowOperator. So yes, there would be still batch
>> emission of the data in the WindowOperator itself, but it would be
>> prolonged/slowed down in terms of wall time because of down stream back
>> pressure caused by RateLimitingOperator.
>> 
>> Btw, with your proposal, with what event time do you want to emit the
>> delayed data? If the event time of the produced records changes based on
>> using/not using windows offsets, this can cause quite a lot of semantic
>> problems and side effects for the downstream operators.
>> 
>> Piotrek
>> 
>>> On 28 Sep 2018, at 15:18, Rong Rong <wa...@gmail.com> wrote:
>>> 
>>> Hi Piotrek,
>>> 
>>> Thanks for getting back to me so quickly. Let me explain.
>>> 
>>> Re 1). As I explained in the doc. we are using a basic Kafka-in Kafka-out
>>> system with same partition number on both side. It is causing degraded
>>> performance in external I/O network traffic.
>>> It is definitely possible to configure more resource (e.g. larger
>> partition
>>> count) for output to handle the burst but it can also be resolved through
>>> some sort of smoothing through internal (either through rate limiting as
>>> you suggested, or through the dynamic offset).
>>> 
>>> Re 2a). Yes I agree and I think I understand your concern. However it is
>>> one simple API addition with default fallbacks that are fully
>>> backward-compatible (or I think it be made fully compatible if I missed
>> and
>>> corner cases).
>>> Re 2b). Yes. there could be many potential issues that causes data burst.
>>> However, putting aside the scenarios that was caused by the nature of the
>>> stream (data skew, bursts) that both affects input and output. We want to
>>> address specifically the case that a smooth input is *deterministically*
>>> resulting in burst output. What we are proposing here is kind of exactly
>>> like the case of users' customer operator. However we can't do so unless
>>> there's an API to control the offset.
>>> 
>>> Regarding the problem of rate limiting and skew. I think I missed one key
>>> point from you. I think you are right. If we introduce a *new rate
>> limiting
>>> operator *(with size > 0) it will
>>> - causes extra state usage within the container (moving all the
>>> components from window operator and store in rate limit buffer at window
>>> boundaries).
>>> - will not cause data skew problem: The data skew problem I mentioned is
>>> that, if data are buffered in window operator state longer for some data
>>> but not the other. Then potentially some panes will handle more late
>>> arrival than others.
>>> 
>>> However if it is possible to get rid of the extra memory usage we will
>>> definitely benchmark the rate-limit approach. Can you be more specific on
>>> how setting the rate-limit operator (size = 0) can resolve the burst
>> issue?
>>> If I understand correctly the backpressure will cause the watermark to
>> not
>>> advance, but once it crosses the window boundary, there will still be a
>>> batch of messages emitting out of the window operator at the same time,
>>> correct?
>>> 
>>> Thanks,
>>> Rong
>>> 
>>> 
>>> 
>>> On Fri, Sep 28, 2018 at 1:25 AM Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> 
>>>> Hi,
>>>> 
>>>> Re 1. Can you be more specific? What system are you using, what’s
>>>> happening and how does it brake?
>>>> 
>>>> While delaying windows firing is probably the most cost effective
>> solution
>>>> for this particular problem, it has some disadvantages:
>>>> a) putting even more logic to already complicated component
>>>> b) not solving potential similar problems. I can easily imagine the same
>>>> issue happening to other scenarios then "interval based operators” such
>> as:
>>>>       - input sources faster then output sinks
>>>>       - data skew
>>>>       - data bursts
>>>>       - users' custom operators causing data bursts
>>>>       - users’ custom operators being prone to bursts (maybe something
>>>> like AsyncOperator or something else that works with an external
>> system) -
>>>> so the problem might not necessarily be limited to the sinks
>>>> 
>>>> As far as I recall, there were some users reporting some similar issues.
>>>> 
>>>> Regarding potential drawbacks of rate limiting, I didn’t understand this
>>>> part:
>>>> 
>>>>> However the problem is similar to delay triggers which can provide
>>>> degraded performance for skew sensitive downstream service, such as
>> feeding
>>>> feature extraction results to deep learning model.
>>>> 
>>>> 
>>>> The way how I could imagine RateLimitingOperator is that it could take a
>>>> parameters: rate limits, buffer size limit.
>>>> 
>>>> With buffer size = 0, it would cause immediately a back pressure if rate
>>>> is exceeded
>>>> With buffer size > 0, ti would first buffer events on the state and only
>>>> when reaching max buffer size, causing the back pressure
>>>> 
>>>> For the case with WindowOperator, if windows are evicted and removed
>> from
>>>> the state, using buffer size > 0, wouldn’t cause increased state usage,
>> it
>>>> would only move the state from the WindowOperator to the
>>>> RateLimitingOperator.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 27 Sep 2018, at 17:28, Rong Rong <wa...@gmail.com> wrote:
>>>>> 
>>>>> HI Piotrek,
>>>>> 
>>>>> Yes, to be more clear,
>>>>> 1) the network I/O issue I am referring to is in between Flink and
>>>> external
>>>>> sink. We did not see issues in between operators.
>>>>> 2) yes we've considered rate limiting sink functions as well which is
>>>> also
>>>>> mentioned in the doc. along with some of the the pro-con we identified.
>>>>> 
>>>>> This kind of problem seems to only occur in WindowOperator so far, but
>>>> yes
>>>>> it can probably occur to any aligned interval based operator.
>>>>> 
>>>>> --
>>>>> Rong
>>>>> 
>>>>> On Wed, Sep 26, 2018 at 11:44 PM Piotr Nowojski <
>> piotr@data-artisans.com
>>>>> 
>>>>> wrote:
>>>>> 
>>>>>> Hi,
>>>>>> 
>>>>>> Thanks for the proposal. Could you provide more
>>>>>> background/explanation/motivation why do you need such feature? What
>> do
>>>> you
>>>>>> mean by “network I/O” degradation?
>>>>>> 
>>>>>> On it’s own burst writes shouldn’t cause problems within Flink. If
>> they
>>>>>> do, we might want to fix the original underlying problem and if they
>> are
>>>>>> causing problems in external systems, we also might think about other
>>>>>> approaches to fix/handle the problem (write rate limiting?), which
>>>> might be
>>>>>> more general and not fixing only bursts originating from
>> WindowOperator.
>>>>>> I’m not saying that your proposal is bad or anything, but I would just
>>>> like
>>>>>> to have more context :)
>>>>>> 
>>>>>> Piotrek.
>>>>>> 
>>>>>>> On 26 Sep 2018, at 19:21, Rong Rong <wa...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Hi Dev,
>>>>>>> 
>>>>>>> I was wondering if there's any previous discussion regarding how to
>>>>>> handle
>>>>>>> burst network I/O when deploying Flink applications with window
>>>>>> operators.
>>>>>>> 
>>>>>>> We've recently see some significant network I/O degradation when
>> trying
>>>>>> to
>>>>>>> use sliding window to perform rolling aggregations. The pattern is
>> very
>>>>>>> periodic: output connections get no traffic for a period of time
>> until
>>>> a
>>>>>>> burst at window boundaries (in our case every 5 minutes).
>>>>>>> 
>>>>>>> We have drafted a doc
>>>>>>> <
>>>>>> 
>>>> 
>> https://docs.google.com/document/d/1fEhbcRgxxX8zFYD_iMBG1DCbHmTcTRfRQFXelPhMFiY/edit?usp=sharing
>>>>>>> 
>>>>>>> on
>>>>>>> how we proposed to handle it to smooth the output traffic spikes.
>>>> Please
>>>>>>> kindly take a look, any comments and suggestions are highly
>>>> appreciated.
>>>>>>> 
>>>>>>> --
>>>>>>> Rong
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>>