You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vishal Santoshi <vi...@gmail.com> on 2018/01/02 15:02:07 UTC

BackPressure handling

I did a simulation on session windows ( in 2 modes ) and let it rip for
about 12 hours

1. Replay where a kafka topic with retention of 7 days was the source (
earliest )
2. Start the pipe with kafka source ( latest )

I saw results that differed dramatically.

On replay the pipeline stalled after  good ramp up while in the second case
the pipeline hummed on without issues. For the same time period the data
consumed is significantly more in the second case with the WM progression
stalled in the first case with no hint of resolution ( the incoming data on
source topic far outstrips the WM progression )  I think I know the reasons
and this is my hypothesis.

In replay mode the number of windows open do not have an upper bound. While
buffer exhaustion ( and data in flight with watermark )  is the reason for
throttle, it does not really limit the open windows and in fact creates
windows that reflect futuristic data ( future is relative to the current WM
) . So if partition x has data for watermark time t(x) and partition y for
watermark time t(y) and t(x) << t(y) where the overall watermark is t(x)
nothing significantly throttles consumption from the y partition ( in fact
for x too ) , the bounded buffer based approach does not give minute
control AFAIK as one would hope and that implies there are far more open
windows than the system can handle and that leads to the pathological case
where the buffers fill up  ( I believe that happens way late ) and
throttling occurs but the WM does not proceed and windows that could ease
the glut the throttling cannot proceed..... In the replay mode the amount
of data implies that the Fetchers keep pulling data at the maximum
consumption allowed by the open ended buffer approach.

My question thus is, is there any way to have a finer control of back
pressure, where in the consumption from a source is throttled preemptively
( by for example decreasing the buffers associated for a pipe or the size
allocated ) or sleeps in the Fetcher code that can help aligning the
performance to have real time consumption  characteristics

Regards,

Vishal.

Re: BackPressure handling

Posted by Vishal Santoshi <vi...@gmail.com>.
Absolutely.

       But without a a view into a global WM at the source level, the --> would
require sources to wait for sources that are "slower in time" -->  is not
possible for folks creating custom sources on extending existing ones. I
would have loved to use a more scientific/data drive approach to slow down
the aggressive consumers but   without an fundamental API change where
sources know the current global WM it is not possible . Further resources
are at a premium as in if I can and should be able to execute should rqs, a
replay or a real time should not show differing characteristics IMHO.

The more I work with Flink, the more I realize that the crux of Flink is WM
generation ( if we are doing Event Processing ) and I would want the
current WM at an Operator  to be exposed at all places feasible. I have
another email thread on why WM driven preemptive analysis of an ordered
list of Events within a Window is desirable but could not do it using the
natural API ( Accumulator based ) b'coz of paucity of WM view.

Anywaz it does work as on now but I know it is an interim solution.

Regards

Vishal

On Wed, Jan 3, 2018 at 9:55 AM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I think your analysis is very thorough and accurate. However, I don't
> think that https://issues.apache.org/jira/browse/FLINK-7282 will solve
> this problem. We're dealing with "time back-pressure" here and not
> traditional processing back-pressure. (Not sure if there's a term for this
> so I coined a new one... šŸ˜‰). The problem is that some Kafka consumers
> progress faster in event-time than others which leads to windows being
> spawned at the window operator that only trigger (and get deleted) once all
> sources catch up to that point in event time.
>
> Simply slowing down all reading mitigates this somewhat but a proper
> solution would require sources to wait for sources that are "slower in
> time".
>
> Does that make sense to you?
>
> Best,
> Aljoscha
>
>
> On 3. Jan 2018, at 15:45, Vishal Santoshi <vi...@gmail.com>
> wrote:
>
> To add and an interim solution to the issue.
>
> I extended the based on the advise "custom source/adapt an existing
> source" and put in a RateLimiter ( guava ) that effectively put a cap on
> each kafka consumer  ( x times the expected incident rqs ). That solved the
> issue  as in it stabilized the flow into down stream window operation ( I
> use ProcessFunction for sessionizing and why is another discussion ) .
>
>  This leads me to these conclusions and either of them could be correct
>
>
> * The kakfa partitions are on different brokers ( the leaders ) and based
> on the how efficient the broker is ( whether it has data in OS Cache  or
> whether there is a skew in leader distribution and thus more stress on a n
> of m brokers) the consumption speed can vary.
> * The skew caused by number of consumers to number of flink nodes ( if no.
> of partitions % no of flink nodes == 0 there is no skew ) the consumption
> rate can vary.
> * Some TM nodes may be sluggish.
>
> Either ways any of the above reasons can cause data to be consumed at
> different speeds which could lead to an imbalance and b'coz of the paucity
> of fine grained back pressure handling leads to more windows that remain
> open, windows that reflect the more aggressive consumption ( and thus more
> variance from the current WM)  than prudent, causing the pathological case
> described above. By regulating the consumption rate ( I put the delimiter
> in the extractTimestamp method ) , it effectively caused the more
> aggressive consumptions to a fixed upper bound, making the rate of
> consumption across consumers effectively similar.
>
> Either ways it seems imperative that https://issues.apache.
> org/jira/browse/FLINK-7282 should be finalized at the earliest. The
> consequences on a shared flink cluster are too huge IMHO.
>
> Please tell me if my conclusions are problematic or do not make sense.
>
>
> Regards
>
> Vishal
>
>
>
>
>
>
> On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi <vishal.santoshi@gmail.com
> > wrote:
>
>> Also note that if I were to start 2 pipelines
>>
>> 1. Working off the head of the topic and thus not prone to the
>> pathological case described above
>> 2. Doing a replay and thus prone to the  pathological case described above
>>
>> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
>>
>>    - All channels multiplexed into the same TCP connection stall
>>    together, as soon as one channel has backpressure.
>>
>>
>> of the jira issue. This has to be a priority IMHO, in a shared VM where
>> jobs should have at least some isolation.
>>
>> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Thank you.
>>>
>>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <ni...@data-artisans.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>> let me already point you towards the JIRA issue for the credit-based
>>>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>>>
>>>> I'll have a look at the rest of this email thread tomorrow...
>>>>
>>>>
>>>> Regards,
>>>> Nico
>>>>
>>>> On 02/01/18 17:52, Vishal Santoshi wrote:
>>>> > Could you please point me to any documentation on the  "credit-based
>>>> > flow control" approach....
>>>> >
>>>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org
>>>> > <ma...@apache.org>> wrote:
>>>> >
>>>> >     Hi Vishal,
>>>> >
>>>> >     your assumptions sound reasonable to me. The community is
>>>> currently
>>>> >     working on a more fine-grained back pressuring with credit-based
>>>> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>>>> >     Nico that might tell you more about the details. Until then I
>>>> guess
>>>> >     you have to implement a custom source/adapt an existing source to
>>>> >     let the data flow in more realistic.
>>>> >
>>>> >     Regards,
>>>> >     Timo
>>>> >
>>>> >     [1]
>>>> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>>>> -timeline.html
>>>> >     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
>>>> 5-timeline.html>
>>>> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc
>>>> >     <https://www.youtube.com/watch?v=scStdhz9FHc>
>>>> >
>>>> >
>>>> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>>>> >
>>>> >         I did a simulation on session windows ( in 2 modes ) and let
>>>> it
>>>> >         rip for about 12 hours
>>>> >
>>>> >         1. Replay where a kafka topic with retention of 7 days was the
>>>> >         source ( earliest )
>>>> >         2. Start the pipe with kafka source ( latest )
>>>> >
>>>> >         I saw results that differed dramatically.
>>>> >
>>>> >         On replay the pipeline stalled after  good ramp up while in
>>>> the
>>>> >         second case the pipeline hummed on without issues. For the
>>>> same
>>>> >         time period the data consumed is significantly more in the
>>>> >         second case with the WM progression stalled in the first case
>>>> >         with no hint of resolution ( the incoming data on source topic
>>>> >         far outstrips the WM progression )  I think I know the reasons
>>>> >         and this is my hypothesis.
>>>> >
>>>> >         In replay mode the number of windows open do not have an upper
>>>> >         bound. While buffer exhaustion ( and data in flight with
>>>> >         watermark )  is the reason for throttle, it does not really
>>>> >         limit the open windows and in fact creates windows that
>>>> reflect
>>>> >         futuristic data ( future is relative to the current WM ) . So
>>>> if
>>>> >         partition x has data for watermark time t(x) and partition y
>>>> for
>>>> >         watermark time t(y) and t(x) << t(y) where the overall
>>>> watermark
>>>> >         is t(x) nothing significantly throttles consumption from the y
>>>> >         partition ( in fact for x too ) , the bounded buffer based
>>>> >         approach does not give minute control AFAIK as one would hope
>>>> >         and that implies there are far more open windows than the
>>>> system
>>>> >         can handle and that leads to the pathological case where the
>>>> >         buffers fill up  ( I believe that happens way late ) and
>>>> >         throttling occurs but the WM does not proceed and windows that
>>>> >         could ease the glut the throttling cannot proceed..... In the
>>>> >         replay mode the amount of data implies that the Fetchers keep
>>>> >         pulling data at the maximum consumption allowed by the open
>>>> >         ended buffer approach.
>>>> >
>>>> >         My question thus is, is there any way to have a finer control
>>>> of
>>>> >         back pressure, where in the consumption from a source is
>>>> >         throttled preemptively ( by for example decreasing the buffers
>>>> >         associated for a pipe or the size allocated ) or sleeps in the
>>>> >         Fetcher code that can help aligning the performance to have
>>>> real
>>>> >         time consumption  characteristics
>>>> >
>>>> >         Regards,
>>>> >
>>>> >         Vishal.
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>> >
>>>>
>>>>
>>>
>>
>
>

Re: BackPressure handling

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I think your analysis is very thorough and accurate. However, I don't think that https://issues.apache.org/jira/browse/FLINK-7282 <https://issues.apache.org/jira/browse/FLINK-7282> will solve this problem. We're dealing with "time back-pressure" here and not traditional processing back-pressure. (Not sure if there's a term for this so I coined a new one... šŸ˜‰). The problem is that some Kafka consumers progress faster in event-time than others which leads to windows being spawned at the window operator that only trigger (and get deleted) once all sources catch up to that point in event time.

Simply slowing down all reading mitigates this somewhat but a proper solution would require sources to wait for sources that are "slower in time".

Does that make sense to you?

Best,
Aljoscha

> On 3. Jan 2018, at 15:45, Vishal Santoshi <vi...@gmail.com> wrote:
> 
> To add and an interim solution to the issue.
> 
> I extended the based on the advise "custom source/adapt an existing source" and put in a RateLimiter ( guava ) that effectively put a cap on each kafka consumer  ( x times the expected incident rqs ). That solved the issue  as in it stabilized the flow into down stream window operation ( I use ProcessFunction for sessionizing and why is another discussion ) . 
> 
>  This leads me to these conclusions and either of them could be correct
> 
> 
> * The kakfa partitions are on different brokers ( the leaders ) and based on the how efficient the broker is ( whether it has data in OS Cache  or whether there is a skew in leader distribution and thus more stress on a n of m brokers) the consumption speed can vary.
> * The skew caused by number of consumers to number of flink nodes ( if no. of partitions % no of flink nodes == 0 there is no skew ) the consumption rate can vary.
> * Some TM nodes may be sluggish.
> 
> Either ways any of the above reasons can cause data to be consumed at different speeds which could lead to an imbalance and b'coz of the paucity of fine grained back pressure handling leads to more windows that remain open, windows that reflect the more aggressive consumption ( and thus more variance from the current WM)  than prudent, causing the pathological case described above. By regulating the consumption rate ( I put the delimiter in the extractTimestamp method ) , it effectively caused the more aggressive consumptions to a fixed upper bound, making the rate of consumption across consumers effectively similar.
> 
> Either ways it seems imperative that https://issues.apache.org/jira/browse/FLINK-7282 <https://issues.apache.org/jira/browse/FLINK-7282> should be finalized at the earliest. The consequences on a shared flink cluster are too huge IMHO.
> 
> Please tell me if my conclusions are problematic or do not make sense.
> 
> 
> Regards 
> 
> Vishal
> 
> 
> 
> 
> 
> 
> On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> Also note that if I were to start 2 pipelines 
> 
> 1. Working off the head of the topic and thus not prone to the pathological case described above
> 2. Doing a replay and thus prone to the  pathological case described above
> 
> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
> All channels multiplexed into the same TCP connection stall together, as soon as one channel has backpressure.
> 
> of the jira issue. This has to be a priority IMHO, in a shared VM where jobs should have at least some isolation.
> 
> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <vishal.santoshi@gmail.com <ma...@gmail.com>> wrote:
> Thank you.
> 
> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <nico@data-artisans.com <ma...@data-artisans.com>> wrote:
> Hi Vishal,
> let me already point you towards the JIRA issue for the credit-based
> flow control: https://issues.apache.org/jira/browse/FLINK-7282 <https://issues.apache.org/jira/browse/FLINK-7282>
> 
> I'll have a look at the rest of this email thread tomorrow...
> 
> 
> Regards,
> Nico
> 
> On 02/01/18 17:52, Vishal Santoshi wrote:
> > Could you please point me to any documentation on the  "credit-based
> > flow control" approach....
> >
> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org <ma...@apache.org>
> > <mailto:twalthr@apache.org <ma...@apache.org>>> wrote:
> >
> >     Hi Vishal,
> >
> >     your assumptions sound reasonable to me. The community is currently
> >     working on a more fine-grained back pressuring with credit-based
> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> >     Nico that might tell you more about the details. Until then I guess
> >     you have to implement a custom source/adapt an existing source to
> >     let the data flow in more realistic.
> >
> >     Regards,
> >     Timo
> >
> >     [1]
> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html>
> >     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html>>
> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc <https://www.youtube.com/watch?v=scStdhz9FHc>
> >     <https://www.youtube.com/watch?v=scStdhz9FHc <https://www.youtube.com/watch?v=scStdhz9FHc>>
> >
> >
> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> >
> >         I did a simulation on session windows ( in 2 modes ) and let it
> >         rip for about 12 hours
> >
> >         1. Replay where a kafka topic with retention of 7 days was the
> >         source ( earliest )
> >         2. Start the pipe with kafka source ( latest )
> >
> >         I saw results that differed dramatically.
> >
> >         On replay the pipeline stalled after  good ramp up while in the
> >         second case the pipeline hummed on without issues. For the same
> >         time period the data consumed is significantly more in the
> >         second case with the WM progression stalled in the first case
> >         with no hint of resolution ( the incoming data on source topic
> >         far outstrips the WM progression )  I think I know the reasons
> >         and this is my hypothesis.
> >
> >         In replay mode the number of windows open do not have an upper
> >         bound. While buffer exhaustion ( and data in flight with
> >         watermark )  is the reason for throttle, it does not really
> >         limit the open windows and in fact creates windows that reflect
> >         futuristic data ( future is relative to the current WM ) . So if
> >         partition x has data for watermark time t(x) and partition y for
> >         watermark time t(y) and t(x) << t(y) where the overall watermark
> >         is t(x) nothing significantly throttles consumption from the y
> >         partition ( in fact for x too ) , the bounded buffer based
> >         approach does not give minute control AFAIK as one would hope
> >         and that implies there are far more open windows than the system
> >         can handle and that leads to the pathological case where the
> >         buffers fill up  ( I believe that happens way late ) and
> >         throttling occurs but the WM does not proceed and windows that
> >         could ease the glut the throttling cannot proceed..... In the
> >         replay mode the amount of data implies that the Fetchers keep
> >         pulling data at the maximum consumption allowed by the open
> >         ended buffer approach.
> >
> >         My question thus is, is there any way to have a finer control of
> >         back pressure, where in the consumption from a source is
> >         throttled preemptively ( by for example decreasing the buffers
> >         associated for a pipe or the size allocated ) or sleeps in the
> >         Fetcher code that can help aligning the performance to have real
> >         time consumption  characteristics
> >
> >         Regards,
> >
> >         Vishal.
> >
> >
> >
> >
> >
> >
> 
> 
> 
> 


Re: BackPressure handling

Posted by Vishal Santoshi <vi...@gmail.com>.
To add and an interim solution to the issue.

I extended the based on the advise "custom source/adapt an existing source"
and put in a RateLimiter ( guava ) that effectively put a cap on each kafka
consumer  ( x times the expected incident rqs ). That solved the issue  as
in it stabilized the flow into down stream window operation ( I use
ProcessFunction for sessionizing and why is another discussion ) .

 This leads me to these conclusions and either of them could be correct


* The kakfa partitions are on different brokers ( the leaders ) and based
on the how efficient the broker is ( whether it has data in OS Cache  or
whether there is a skew in leader distribution and thus more stress on a n
of m brokers) the consumption speed can vary.
* The skew caused by number of consumers to number of flink nodes ( if no.
of partitions % no of flink nodes == 0 there is no skew ) the consumption
rate can vary.
* Some TM nodes may be sluggish.

Either ways any of the above reasons can cause data to be consumed at
different speeds which could lead to an imbalance and b'coz of the paucity
of fine grained back pressure handling leads to more windows that remain
open, windows that reflect the more aggressive consumption ( and thus more
variance from the current WM)  than prudent, causing the pathological case
described above. By regulating the consumption rate ( I put the delimiter
in the extractTimestamp method ) , it effectively caused the more
aggressive consumptions to a fixed upper bound, making the rate of
consumption across consumers effectively similar.

Either ways it seems imperative that https://issues.apache.org/
jira/browse/FLINK-7282 should be finalized at the earliest. The
consequences on a shared flink cluster are too huge IMHO.

Please tell me if my conclusions are problematic or do not make sense.


Regards

Vishal






On Tue, Jan 2, 2018 at 3:04 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> Also note that if I were to start 2 pipelines
>
> 1. Working off the head of the topic and thus not prone to the
> pathological case described above
> 2. Doing a replay and thus prone to the  pathological case described above
>
> Than the 2nd pipe will stall the 1st pipeline. This seems to to point to
>
>    - All channels multiplexed into the same TCP connection stall
>    together, as soon as one channel has backpressure.
>
>
> of the jira issue. This has to be a priority IMHO, in a shared VM where
> jobs should have at least some isolation.
>
> On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <vishal.santoshi@gmail.com
> > wrote:
>
>> Thank you.
>>
>> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <ni...@data-artisans.com>
>> wrote:
>>
>>> Hi Vishal,
>>> let me already point you towards the JIRA issue for the credit-based
>>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>>
>>> I'll have a look at the rest of this email thread tomorrow...
>>>
>>>
>>> Regards,
>>> Nico
>>>
>>> On 02/01/18 17:52, Vishal Santoshi wrote:
>>> > Could you please point me to any documentation on the  "credit-based
>>> > flow control" approach....
>>> >
>>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org
>>> > <ma...@apache.org>> wrote:
>>> >
>>> >     Hi Vishal,
>>> >
>>> >     your assumptions sound reasonable to me. The community is currently
>>> >     working on a more fine-grained back pressuring with credit-based
>>> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>>> >     Nico that might tell you more about the details. Until then I guess
>>> >     you have to implement a custom source/adapt an existing source to
>>> >     let the data flow in more realistic.
>>> >
>>> >     Regards,
>>> >     Timo
>>> >
>>> >     [1]
>>> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>>> -timeline.html
>>> >     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
>>> 5-timeline.html>
>>> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc
>>> >     <https://www.youtube.com/watch?v=scStdhz9FHc>
>>> >
>>> >
>>> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>>> >
>>> >         I did a simulation on session windows ( in 2 modes ) and let it
>>> >         rip for about 12 hours
>>> >
>>> >         1. Replay where a kafka topic with retention of 7 days was the
>>> >         source ( earliest )
>>> >         2. Start the pipe with kafka source ( latest )
>>> >
>>> >         I saw results that differed dramatically.
>>> >
>>> >         On replay the pipeline stalled after  good ramp up while in the
>>> >         second case the pipeline hummed on without issues. For the same
>>> >         time period the data consumed is significantly more in the
>>> >         second case with the WM progression stalled in the first case
>>> >         with no hint of resolution ( the incoming data on source topic
>>> >         far outstrips the WM progression )  I think I know the reasons
>>> >         and this is my hypothesis.
>>> >
>>> >         In replay mode the number of windows open do not have an upper
>>> >         bound. While buffer exhaustion ( and data in flight with
>>> >         watermark )  is the reason for throttle, it does not really
>>> >         limit the open windows and in fact creates windows that reflect
>>> >         futuristic data ( future is relative to the current WM ) . So
>>> if
>>> >         partition x has data for watermark time t(x) and partition y
>>> for
>>> >         watermark time t(y) and t(x) << t(y) where the overall
>>> watermark
>>> >         is t(x) nothing significantly throttles consumption from the y
>>> >         partition ( in fact for x too ) , the bounded buffer based
>>> >         approach does not give minute control AFAIK as one would hope
>>> >         and that implies there are far more open windows than the
>>> system
>>> >         can handle and that leads to the pathological case where the
>>> >         buffers fill up  ( I believe that happens way late ) and
>>> >         throttling occurs but the WM does not proceed and windows that
>>> >         could ease the glut the throttling cannot proceed..... In the
>>> >         replay mode the amount of data implies that the Fetchers keep
>>> >         pulling data at the maximum consumption allowed by the open
>>> >         ended buffer approach.
>>> >
>>> >         My question thus is, is there any way to have a finer control
>>> of
>>> >         back pressure, where in the consumption from a source is
>>> >         throttled preemptively ( by for example decreasing the buffers
>>> >         associated for a pipe or the size allocated ) or sleeps in the
>>> >         Fetcher code that can help aligning the performance to have
>>> real
>>> >         time consumption  characteristics
>>> >
>>> >         Regards,
>>> >
>>> >         Vishal.
>>> >
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>>
>>
>

Re: BackPressure handling

Posted by Vishal Santoshi <vi...@gmail.com>.
Also note that if I were to start 2 pipelines

1. Working off the head of the topic and thus not prone to the pathological
case described above
2. Doing a replay and thus prone to the  pathological case described above

Than the 2nd pipe will stall the 1st pipeline. This seems to to point to

   - All channels multiplexed into the same TCP connection stall together,
   as soon as one channel has backpressure.


of the jira issue. This has to be a priority IMHO, in a shared VM where
jobs should have at least some isolation.

On Tue, Jan 2, 2018 at 2:19 PM, Vishal Santoshi <vi...@gmail.com>
wrote:

> Thank you.
>
> On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <ni...@data-artisans.com>
> wrote:
>
>> Hi Vishal,
>> let me already point you towards the JIRA issue for the credit-based
>> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>>
>> I'll have a look at the rest of this email thread tomorrow...
>>
>>
>> Regards,
>> Nico
>>
>> On 02/01/18 17:52, Vishal Santoshi wrote:
>> > Could you please point me to any documentation on the  "credit-based
>> > flow control" approach....
>> >
>> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org
>> > <ma...@apache.org>> wrote:
>> >
>> >     Hi Vishal,
>> >
>> >     your assumptions sound reasonable to me. The community is currently
>> >     working on a more fine-grained back pressuring with credit-based
>> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>> >     Nico that might tell you more about the details. Until then I guess
>> >     you have to implement a custom source/adapt an existing source to
>> >     let the data flow in more realistic.
>> >
>> >     Regards,
>> >     Timo
>> >
>> >     [1]
>> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5
>> -timeline.html
>> >     <http://flink.apache.org/news/2017/11/22/release-1.4-
>> and-1.5-timeline.html>
>> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc
>> >     <https://www.youtube.com/watch?v=scStdhz9FHc>
>> >
>> >
>> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>> >
>> >         I did a simulation on session windows ( in 2 modes ) and let it
>> >         rip for about 12 hours
>> >
>> >         1. Replay where a kafka topic with retention of 7 days was the
>> >         source ( earliest )
>> >         2. Start the pipe with kafka source ( latest )
>> >
>> >         I saw results that differed dramatically.
>> >
>> >         On replay the pipeline stalled after  good ramp up while in the
>> >         second case the pipeline hummed on without issues. For the same
>> >         time period the data consumed is significantly more in the
>> >         second case with the WM progression stalled in the first case
>> >         with no hint of resolution ( the incoming data on source topic
>> >         far outstrips the WM progression )  I think I know the reasons
>> >         and this is my hypothesis.
>> >
>> >         In replay mode the number of windows open do not have an upper
>> >         bound. While buffer exhaustion ( and data in flight with
>> >         watermark )  is the reason for throttle, it does not really
>> >         limit the open windows and in fact creates windows that reflect
>> >         futuristic data ( future is relative to the current WM ) . So if
>> >         partition x has data for watermark time t(x) and partition y for
>> >         watermark time t(y) and t(x) << t(y) where the overall watermark
>> >         is t(x) nothing significantly throttles consumption from the y
>> >         partition ( in fact for x too ) , the bounded buffer based
>> >         approach does not give minute control AFAIK as one would hope
>> >         and that implies there are far more open windows than the system
>> >         can handle and that leads to the pathological case where the
>> >         buffers fill up  ( I believe that happens way late ) and
>> >         throttling occurs but the WM does not proceed and windows that
>> >         could ease the glut the throttling cannot proceed..... In the
>> >         replay mode the amount of data implies that the Fetchers keep
>> >         pulling data at the maximum consumption allowed by the open
>> >         ended buffer approach.
>> >
>> >         My question thus is, is there any way to have a finer control of
>> >         back pressure, where in the consumption from a source is
>> >         throttled preemptively ( by for example decreasing the buffers
>> >         associated for a pipe or the size allocated ) or sleeps in the
>> >         Fetcher code that can help aligning the performance to have real
>> >         time consumption  characteristics
>> >
>> >         Regards,
>> >
>> >         Vishal.
>> >
>> >
>> >
>> >
>> >
>> >
>>
>>
>

Re: BackPressure handling

Posted by Vishal Santoshi <vi...@gmail.com>.
Thank you.

On Tue, Jan 2, 2018 at 1:31 PM, Nico Kruber <ni...@data-artisans.com> wrote:

> Hi Vishal,
> let me already point you towards the JIRA issue for the credit-based
> flow control: https://issues.apache.org/jira/browse/FLINK-7282
>
> I'll have a look at the rest of this email thread tomorrow...
>
>
> Regards,
> Nico
>
> On 02/01/18 17:52, Vishal Santoshi wrote:
> > Could you please point me to any documentation on the  "credit-based
> > flow control" approach....
> >
> > On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Vishal,
> >
> >     your assumptions sound reasonable to me. The community is currently
> >     working on a more fine-grained back pressuring with credit-based
> >     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
> >     Nico that might tell you more about the details. Until then I guess
> >     you have to implement a custom source/adapt an existing source to
> >     let the data flow in more realistic.
> >
> >     Regards,
> >     Timo
> >
> >     [1]
> >     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
> 5-timeline.html
> >     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.
> 5-timeline.html>
> >     [2] https://www.youtube.com/watch?v=scStdhz9FHc
> >     <https://www.youtube.com/watch?v=scStdhz9FHc>
> >
> >
> >     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> >
> >         I did a simulation on session windows ( in 2 modes ) and let it
> >         rip for about 12 hours
> >
> >         1. Replay where a kafka topic with retention of 7 days was the
> >         source ( earliest )
> >         2. Start the pipe with kafka source ( latest )
> >
> >         I saw results that differed dramatically.
> >
> >         On replay the pipeline stalled after  good ramp up while in the
> >         second case the pipeline hummed on without issues. For the same
> >         time period the data consumed is significantly more in the
> >         second case with the WM progression stalled in the first case
> >         with no hint of resolution ( the incoming data on source topic
> >         far outstrips the WM progression )  I think I know the reasons
> >         and this is my hypothesis.
> >
> >         In replay mode the number of windows open do not have an upper
> >         bound. While buffer exhaustion ( and data in flight with
> >         watermark )  is the reason for throttle, it does not really
> >         limit the open windows and in fact creates windows that reflect
> >         futuristic data ( future is relative to the current WM ) . So if
> >         partition x has data for watermark time t(x) and partition y for
> >         watermark time t(y) and t(x) << t(y) where the overall watermark
> >         is t(x) nothing significantly throttles consumption from the y
> >         partition ( in fact for x too ) , the bounded buffer based
> >         approach does not give minute control AFAIK as one would hope
> >         and that implies there are far more open windows than the system
> >         can handle and that leads to the pathological case where the
> >         buffers fill up  ( I believe that happens way late ) and
> >         throttling occurs but the WM does not proceed and windows that
> >         could ease the glut the throttling cannot proceed..... In the
> >         replay mode the amount of data implies that the Fetchers keep
> >         pulling data at the maximum consumption allowed by the open
> >         ended buffer approach.
> >
> >         My question thus is, is there any way to have a finer control of
> >         back pressure, where in the consumption from a source is
> >         throttled preemptively ( by for example decreasing the buffers
> >         associated for a pipe or the size allocated ) or sleeps in the
> >         Fetcher code that can help aligning the performance to have real
> >         time consumption  characteristics
> >
> >         Regards,
> >
> >         Vishal.
> >
> >
> >
> >
> >
> >
>
>

Re: BackPressure handling

Posted by Nico Kruber <ni...@data-artisans.com>.
Hi Vishal,
let me already point you towards the JIRA issue for the credit-based
flow control: https://issues.apache.org/jira/browse/FLINK-7282

I'll have a look at the rest of this email thread tomorrow...


Regards,
Nico

On 02/01/18 17:52, Vishal Santoshi wrote:
> Could you please point me to any documentation on theĀ Ā "credit-based
> flow control" approach....
> 
> On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <twalthr@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Vishal,
> 
>     your assumptions sound reasonable to me. The community is currently
>     working on a more fine-grained back pressuring with credit-based
>     flow control. It is on the roamap for 1.5 [1]/[2]. I will loop in
>     Nico that might tell you more about the details. Until then I guess
>     you have to implement a custom source/adapt an existing source to
>     let the data flow in more realistic.
> 
>     Regards,
>     Timo
> 
>     [1]
>     http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
>     <http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html>
>     [2] https://www.youtube.com/watch?v=scStdhz9FHc
>     <https://www.youtube.com/watch?v=scStdhz9FHc>
> 
> 
>     Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> 
>         I did a simulation on session windows ( in 2 modes ) and let it
>         rip for about 12 hours
> 
>         1. Replay where a kafka topic with retention of 7 days was the
>         source ( earliest )
>         2. Start the pipe with kafka source ( latest )
> 
>         I saw results that differed dramatically.
> 
>         On replay the pipeline stalled afterĀ  good ramp up while in the
>         second case the pipeline hummed on without issues. For the same
>         time period the data consumed is significantly more in the
>         second case with the WM progression stalled in the first case
>         with no hint of resolution ( the incoming data on source topic
>         far outstrips the WM progression )Ā  I think I know the reasons
>         and this is my hypothesis.
> 
>         In replay mode the number of windows open do not have an upper
>         bound. While buffer exhaustion ( and data in flight with
>         watermark )Ā  is the reason for throttle, it does not really
>         limit the open windows and in fact creates windows that reflect
>         futuristic data ( future is relative to the current WM ) . So if
>         partition x has data for watermark time t(x) and partition y for
>         watermark time t(y) and t(x) << t(y) where the overall watermark
>         is t(x) nothing significantly throttles consumption from the y
>         partition ( in fact for x too ) , the bounded buffer based
>         approach does not give minute control AFAIK as one would hope
>         and that implies there are far more open windows than the system
>         can handle and that leads to the pathological case where the
>         buffers fill upĀ  ( I believe that happens way late ) and
>         throttling occurs but the WM does not proceed and windows that
>         could ease the glut the throttling cannot proceed..... In the
>         replay mode the amount of data implies that the Fetchers keep
>         pulling data at the maximum consumption allowed by the open
>         ended buffer approach.
> 
>         My question thus is, is there any way to have a finer control of
>         back pressure, where in the consumption from a source is
>         throttled preemptively ( by for example decreasing the buffers
>         associated for a pipe or the size allocated ) or sleeps in the
>         Fetcher code that can help aligning the performance to have real
>         time consumptionĀ  characteristics
> 
>         Regards,
> 
>         Vishal.
> 
> 
> 
> 
> 
> 


Re: BackPressure handling

Posted by Vishal Santoshi <vi...@gmail.com>.
Could you please point me to any documentation on the  "credit-based flow
control" approach....

On Tue, Jan 2, 2018 at 10:35 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Vishal,
>
> your assumptions sound reasonable to me. The community is currently
> working on a more fine-grained back pressuring with credit-based flow
> control. It is on the roamap for 1.5 [1]/[2]. I will loop in Nico that
> might tell you more about the details. Until then I guess you have to
> implement a custom source/adapt an existing source to let the data flow in
> more realistic.
>
> Regards,
> Timo
>
> [1] http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-
> timeline.html
> [2] https://www.youtube.com/watch?v=scStdhz9FHc
>
>
> Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
>
> I did a simulation on session windows ( in 2 modes ) and let it rip for
>> about 12 hours
>>
>> 1. Replay where a kafka topic with retention of 7 days was the source (
>> earliest )
>> 2. Start the pipe with kafka source ( latest )
>>
>> I saw results that differed dramatically.
>>
>> On replay the pipeline stalled after  good ramp up while in the second
>> case the pipeline hummed on without issues. For the same time period the
>> data consumed is significantly more in the second case with the WM
>> progression stalled in the first case with no hint of resolution ( the
>> incoming data on source topic far outstrips the WM progression )  I think I
>> know the reasons and this is my hypothesis.
>>
>> In replay mode the number of windows open do not have an upper bound.
>> While buffer exhaustion ( and data in flight with watermark )  is the
>> reason for throttle, it does not really limit the open windows and in fact
>> creates windows that reflect futuristic data ( future is relative to the
>> current WM ) . So if partition x has data for watermark time t(x) and
>> partition y for watermark time t(y) and t(x) << t(y) where the overall
>> watermark is t(x) nothing significantly throttles consumption from the y
>> partition ( in fact for x too ) , the bounded buffer based approach does
>> not give minute control AFAIK as one would hope and that implies there are
>> far more open windows than the system can handle and that leads to the
>> pathological case where the buffers fill up  ( I believe that happens way
>> late ) and throttling occurs but the WM does not proceed and windows that
>> could ease the glut the throttling cannot proceed..... In the replay mode
>> the amount of data implies that the Fetchers keep pulling data at the
>> maximum consumption allowed by the open ended buffer approach.
>>
>> My question thus is, is there any way to have a finer control of back
>> pressure, where in the consumption from a source is throttled preemptively
>> ( by for example decreasing the buffers associated for a pipe or the size
>> allocated ) or sleeps in the Fetcher code that can help aligning the
>> performance to have real time consumption  characteristics
>>
>> Regards,
>>
>> Vishal.
>>
>>
>>
>>
>>
>

Re: BackPressure handling

Posted by Timo Walther <tw...@apache.org>.
Hi Vishal,

your assumptions sound reasonable to me. The community is currently 
working on a more fine-grained back pressuring with credit-based flow 
control. It is on the roamap for 1.5 [1]/[2]. I will loop in Nico that 
might tell you more about the details. Until then I guess you have to 
implement a custom source/adapt an existing source to let the data flow 
in more realistic.

Regards,
Timo

[1] 
http://flink.apache.org/news/2017/11/22/release-1.4-and-1.5-timeline.html
[2] https://www.youtube.com/watch?v=scStdhz9FHc


Am 1/2/18 um 4:02 PM schrieb Vishal Santoshi:
> I did a simulation on session windows ( in 2 modes ) and let it rip 
> for about 12 hours
>
> 1. Replay where a kafka topic with retention of 7 days was the source 
> ( earliest )
> 2. Start the pipe with kafka source ( latest )
>
> I saw results that differed dramatically.
>
> On replay the pipeline stalled afterĀ  good ramp up while in the second 
> case the pipeline hummed on without issues. For the same time period 
> the data consumed is significantly more in the second case with the WM 
> progression stalled in the first case with no hint of resolution ( the 
> incoming data on source topic far outstrips the WM progression )Ā  I 
> think I know the reasons and this is my hypothesis.
>
> In replay mode the number of windows open do not have an upper bound. 
> While buffer exhaustion ( and data in flight with watermark )Ā  is the 
> reason for throttle, it does not really limit the open windows and in 
> fact creates windows that reflect futuristic data ( future is relative 
> to the current WM ) . So if partition x has data for watermark time 
> t(x) and partition y for watermark time t(y) and t(x) << t(y) where 
> the overall watermark is t(x) nothing significantly throttles 
> consumption from the y partition ( in fact for x too ) , the bounded 
> buffer based approach does not give minute control AFAIK as one would 
> hope and that implies there are far more open windows than the system 
> can handle and that leads to the pathological case where the buffers 
> fill upĀ  ( I believe that happens way late ) and throttling occurs but 
> the WM does not proceed and windows that could ease the glut the 
> throttling cannot proceed..... In the replay mode the amount of data 
> implies that the Fetchers keep pulling data at the maximum consumption 
> allowed by the open ended buffer approach.
>
> My question thus is, is there any way to have a finer control of back 
> pressure, where in the consumption from a source is throttled 
> preemptively ( by for example decreasing the buffers associated for a 
> pipe or the size allocated ) or sleeps in the Fetcher code that can 
> help aligning the performance to have real time consumptionĀ  
> characteristics
>
> Regards,
>
> Vishal.
>
>
>
>