You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Aggarwal, Ajay" <Aj...@netapp.com> on 2019/02/13 16:50:34 UTC

Impact of occasional big pauses in stream processing

I was wondering what is the impact if one of the stream operator function occasionally takes too long to process the event.  Given the following simple flink job

       inputStream
          .KeyBy (“tenantId”)
          .process ( new MyKeyedProcessFunction())

, if occasionally MyKeyedProcessFunction takes too long (say ~5-10 minutes) to process an incoming element, what is the impact on overall pipeline? Is the impact limited to

  1.  Specific key for which MyKeyedProcessFunction is currently taking too long to process an element, or
  2.  Specific Taskslot, where MyKeyedProcessFunction is currently taking too long to process an element, i.e. impacting multiple keys, or
  3.  Entire inputstream ?

Also what is the built in resiliency in these cases? Is there a concept of timeout for each operator function?

Ajay

Re: Impact of occasional big pauses in stream processing

Posted by Till Rohrmann <tr...@apache.org>.
Hi Ajay,

Rong and Andrey are right. Backpressure can eventually be propagated up
until it reaches the sources where it slows down the reading from your
external system. But unless your cluster is only temporarily
under-provisioned for your use case, the system should be able to catch up
with the stream again.

Concerning your question of job isolation: When running multiple jobs on a
session cluster where a single job is backpressured, it won't affect the
other jobs in terms of backpressure. However, when using the session
cluster you always risk that different jobs affect each other slightly
because they are executed in the JVM process. So for example if one job is
quite hungry in terms of heap space, it might affect other jobs by
allocating too much heap space (proportionally). The best way to ensure job
isolation is to use Flink's job mode where each job is executed on a
dedicated cluster.

Cheers,
Till

On Fri, Feb 15, 2019 at 12:43 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Ajay,
>
> Yes, Andrey is right. I was actually missing the first basic but important
> point: If your process function is stuck, it will immediately block that
> thread.
> From your description, what it sounds like is that not all the messages
> you consume from kafka actually triggers the processing logic. There should
> be plenty of way to avoid over provisioning your job just to satisfy your
> peak traffic: for example what Andrey suggested, using a Async RPC call to
> some other resource for the heavy computation; or split it into a filter
> (which removes non-actionable messages) and the actual process (where you
> can use a higher parallelism to reduce chances of stuck).
>
> Regarding the second question, I am not expert but my understanding is
> that there should be isolations between Flink jobs you run on that session
> cluster. e.g. one job's backpressure will not affect other jobs' consumer.
> I've CCed Till who might be able to better answer your question.
>
> --
> Rong
>
>
> On Thu, Feb 14, 2019 at 8:24 AM Aggarwal, Ajay <Aj...@netapp.com>
> wrote:
>
>> Thank you Rong and Andrey. The blog and your explanation was very useful.
>>
>>
>>
>> In my use case, source stream (kafka based) contains messages that
>> capture some “work” that needs to be done for a tenant.  It’s a
>> multi-tenant source stream. I need to queue up (and execute) this work per
>> tenant in the order in which it was produced. And flink provides this
>> ordered queuing per tenant very elegantly. Now the only thing is that
>> executing this “work” could be expensive in terms of compute/memory/time.
>> Furthermore per tenant there is a constraint of doing this work serially.
>> Hence this question.  I believe if our flink cluster has enough resources,
>> it should work.
>>
>>
>>
>> But this leads to another related question. If there are multiple flink
>> jobs sharing the same flink cluster and one of those jobs sees the spike
>> such that back pressure builds up all the way to the source, will that
>> impact other jobs as well? Is a task slot shared by multiple jobs? If not,
>> my understanding is that this should not impact other flink jobs. Is that
>> correct?
>>
>>
>>
>> Thanks.
>>
>>
>>
>> Ajay
>>
>>
>>
>> *From: *Andrey Zagrebin <an...@ververica.com>
>> *Date: *Thursday, February 14, 2019 at 5:09 AM
>> *To: *Rong Rong <wa...@gmail.com>
>> *Cc: *"Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org"
>> <us...@flink.apache.org>
>> *Subject: *Re: Impact of occasional big pauses in stream processing
>>
>>
>>
>> Hi Ajay,
>>
>>
>>
>> Technically, it will immediately block the thread of
>> MyKeyedProcessFunction subtask scheduled to some slot and basically block
>> processing of the key range assigned to this subtask.
>> Practically, I agree with Rong's answer. Depending on the topology of
>> your inputStream, it can eventually block a lot of stuff.
>> In general, I think, it is not recommended to perform blocking operations
>> in process record functions. You could consider AsyncIO [1] to unblock the
>> task thread.
>>
>> Best,
>>
>> Andrey
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>
>>
>>
>> On Thu, Feb 14, 2019 at 6:03 AM Rong Rong <wa...@gmail.com> wrote:
>>
>> Hi Ajay,
>>
>>
>>
>> Flink handles "backpressure" in a graceful way so that it doesn't get
>> affected when your processing pipeline is occasionally slowed down.
>>
>> I think the following articles will help [1,2].
>>
>>
>>
>> In your specific case: the "KeyBy" operation will re-hash data so they
>> can be reshuffled from all input consumers to all your process operators
>> (in this case the MyKeyedProcessFunction). If one of the process operator
>> is backpressured, it will back track all the way to the source.
>>
>> So, my understanding is that: since there's the reshuffling, if one of
>> the process function is backpressured, it will potentially affect all the
>> source operators.
>>
>>
>>
>> Thanks,
>>
>> Rong
>>
>>
>>
>> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
>>
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>>
>>
>>
>> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay <Aj...@netapp.com>
>> wrote:
>>
>> I was wondering what is the impact if one of the stream operator function
>> occasionally takes too long to process the event.  Given the following
>> simple flink job
>>
>>
>>
>>        inputStream
>>
>>           .KeyBy (“tenantId”)
>>
>>           .process ( new MyKeyedProcessFunction())
>>
>>
>>
>> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
>> minutes) to process an incoming element, what is the impact on overall
>> pipeline? Is the impact limited to
>>
>>    1. Specific key for which MyKeyedProcessFunction is currently taking
>>    too long to process an element, or
>>    2. Specific Taskslot, where MyKeyedProcessFunction is currently
>>    taking too long to process an element, i.e. impacting multiple keys, or
>>    3. Entire inputstream ?
>>
>>
>>
>> Also what is the built in resiliency in these cases? Is there a concept
>> of timeout for each operator function?
>>
>>
>>
>> Ajay
>>
>>

Re: Impact of occasional big pauses in stream processing

Posted by Rong Rong <wa...@gmail.com>.
Hi Ajay,

Yes, Andrey is right. I was actually missing the first basic but important
point: If your process function is stuck, it will immediately block that
thread.
From your description, what it sounds like is that not all the messages you
consume from kafka actually triggers the processing logic. There should be
plenty of way to avoid over provisioning your job just to satisfy your peak
traffic: for example what Andrey suggested, using a Async RPC call to some
other resource for the heavy computation; or split it into a filter (which
removes non-actionable messages) and the actual process (where you can use
a higher parallelism to reduce chances of stuck).

Regarding the second question, I am not expert but my understanding is that
there should be isolations between Flink jobs you run on that session
cluster. e.g. one job's backpressure will not affect other jobs' consumer.
I've CCed Till who might be able to better answer your question.

--
Rong


On Thu, Feb 14, 2019 at 8:24 AM Aggarwal, Ajay <Aj...@netapp.com>
wrote:

> Thank you Rong and Andrey. The blog and your explanation was very useful.
>
>
>
> In my use case, source stream (kafka based) contains messages that capture
> some “work” that needs to be done for a tenant.  It’s a multi-tenant source
> stream. I need to queue up (and execute) this work per tenant in the order
> in which it was produced. And flink provides this ordered queuing per
> tenant very elegantly. Now the only thing is that executing this “work”
> could be expensive in terms of compute/memory/time.  Furthermore per tenant
> there is a constraint of doing this work serially. Hence this question.  I
> believe if our flink cluster has enough resources, it should work.
>
>
>
> But this leads to another related question. If there are multiple flink
> jobs sharing the same flink cluster and one of those jobs sees the spike
> such that back pressure builds up all the way to the source, will that
> impact other jobs as well? Is a task slot shared by multiple jobs? If not,
> my understanding is that this should not impact other flink jobs. Is that
> correct?
>
>
>
> Thanks.
>
>
>
> Ajay
>
>
>
> *From: *Andrey Zagrebin <an...@ververica.com>
> *Date: *Thursday, February 14, 2019 at 5:09 AM
> *To: *Rong Rong <wa...@gmail.com>
> *Cc: *"Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org"
> <us...@flink.apache.org>
> *Subject: *Re: Impact of occasional big pauses in stream processing
>
>
>
> Hi Ajay,
>
>
>
> Technically, it will immediately block the thread of
> MyKeyedProcessFunction subtask scheduled to some slot and basically block
> processing of the key range assigned to this subtask.
> Practically, I agree with Rong's answer. Depending on the topology of your
> inputStream, it can eventually block a lot of stuff.
> In general, I think, it is not recommended to perform blocking operations
> in process record functions. You could consider AsyncIO [1] to unblock the
> task thread.
>
> Best,
>
> Andrey
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>
>
>
> On Thu, Feb 14, 2019 at 6:03 AM Rong Rong <wa...@gmail.com> wrote:
>
> Hi Ajay,
>
>
>
> Flink handles "backpressure" in a graceful way so that it doesn't get
> affected when your processing pipeline is occasionally slowed down.
>
> I think the following articles will help [1,2].
>
>
>
> In your specific case: the "KeyBy" operation will re-hash data so they can
> be reshuffled from all input consumers to all your process operators (in
> this case the MyKeyedProcessFunction). If one of the process operator is
> backpressured, it will back track all the way to the source.
>
> So, my understanding is that: since there's the reshuffling, if one of the
> process function is backpressured, it will potentially affect all the
> source operators.
>
>
>
> Thanks,
>
> Rong
>
>
>
> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
>
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>
>
>
> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay <Aj...@netapp.com>
> wrote:
>
> I was wondering what is the impact if one of the stream operator function
> occasionally takes too long to process the event.  Given the following
> simple flink job
>
>
>
>        inputStream
>
>           .KeyBy (“tenantId”)
>
>           .process ( new MyKeyedProcessFunction())
>
>
>
> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
> minutes) to process an incoming element, what is the impact on overall
> pipeline? Is the impact limited to
>
>    1. Specific key for which MyKeyedProcessFunction is currently taking
>    too long to process an element, or
>    2. Specific Taskslot, where MyKeyedProcessFunction is currently taking
>    too long to process an element, i.e. impacting multiple keys, or
>    3. Entire inputstream ?
>
>
>
> Also what is the built in resiliency in these cases? Is there a concept of
> timeout for each operator function?
>
>
>
> Ajay
>
>

Re: Impact of occasional big pauses in stream processing

Posted by "Aggarwal, Ajay" <Aj...@netapp.com>.
Thank you Rong and Andrey. The blog and your explanation was very useful.

In my use case, source stream (kafka based) contains messages that capture some “work” that needs to be done for a tenant.  It’s a multi-tenant source stream. I need to queue up (and execute) this work per tenant in the order in which it was produced. And flink provides this ordered queuing per tenant very elegantly. Now the only thing is that executing this “work” could be expensive in terms of compute/memory/time.  Furthermore per tenant there is a constraint of doing this work serially. Hence this question.  I believe if our flink cluster has enough resources, it should work.

But this leads to another related question. If there are multiple flink jobs sharing the same flink cluster and one of those jobs sees the spike such that back pressure builds up all the way to the source, will that impact other jobs as well? Is a task slot shared by multiple jobs? If not, my understanding is that this should not impact other flink jobs. Is that correct?

Thanks.

Ajay

From: Andrey Zagrebin <an...@ververica.com>
Date: Thursday, February 14, 2019 at 5:09 AM
To: Rong Rong <wa...@gmail.com>
Cc: "Aggarwal, Ajay" <Aj...@netapp.com>, "user@flink.apache.org" <us...@flink.apache.org>
Subject: Re: Impact of occasional big pauses in stream processing

Hi Ajay,

Technically, it will immediately block the thread of MyKeyedProcessFunction subtask scheduled to some slot and basically block processing of the key range assigned to this subtask.
Practically, I agree with Rong's answer. Depending on the topology of your inputStream, it can eventually block a lot of stuff.
In general, I think, it is not recommended to perform blocking operations in process record functions. You could consider AsyncIO [1] to unblock the task thread.

Best,
Andrey

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Thu, Feb 14, 2019 at 6:03 AM Rong Rong <wa...@gmail.com>> wrote:
Hi Ajay,

Flink handles "backpressure" in a graceful way so that it doesn't get affected when your processing pipeline is occasionally slowed down.
I think the following articles will help [1,2].

In your specific case: the "KeyBy" operation will re-hash data so they can be reshuffled from all input consumers to all your process operators (in this case the MyKeyedProcessFunction). If one of the process operator is backpressured, it will back track all the way to the source.
So, my understanding is that: since there's the reshuffling, if one of the process function is backpressured, it will potentially affect all the source operators.

Thanks,
Rong

[1] https://www.ververica.com/blog/how-flink-handles-backpressure
[2] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay <Aj...@netapp.com>> wrote:
I was wondering what is the impact if one of the stream operator function occasionally takes too long to process the event.  Given the following simple flink job

       inputStream
          .KeyBy (“tenantId”)
          .process ( new MyKeyedProcessFunction())

, if occasionally MyKeyedProcessFunction takes too long (say ~5-10 minutes) to process an incoming element, what is the impact on overall pipeline? Is the impact limited to

  1.  Specific key for which MyKeyedProcessFunction is currently taking too long to process an element, or
  2.  Specific Taskslot, where MyKeyedProcessFunction is currently taking too long to process an element, i.e. impacting multiple keys, or
  3.  Entire inputstream ?

Also what is the built in resiliency in these cases? Is there a concept of timeout for each operator function?

Ajay

Re: Impact of occasional big pauses in stream processing

Posted by Andrey Zagrebin <an...@ververica.com>.
Hi Ajay,

Technically, it will immediately block the thread of MyKeyedProcessFunction
subtask scheduled to some slot and basically block processing of the key
range assigned to this subtask.
Practically, I agree with Rong's answer. Depending on the topology of your
inputStream, it can eventually block a lot of stuff.
In general, I think, it is not recommended to perform blocking operations
in process record functions. You could consider AsyncIO [1] to unblock the
task thread.

Best,
Andrey

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html

On Thu, Feb 14, 2019 at 6:03 AM Rong Rong <wa...@gmail.com> wrote:

> Hi Ajay,
>
> Flink handles "backpressure" in a graceful way so that it doesn't get
> affected when your processing pipeline is occasionally slowed down.
> I think the following articles will help [1,2].
>
> In your specific case: the "KeyBy" operation will re-hash data so they can
> be reshuffled from all input consumers to all your process operators (in
> this case the MyKeyedProcessFunction). If one of the process operator is
> backpressured, it will back track all the way to the source.
> So, my understanding is that: since there's the reshuffling, if one of the
> process function is backpressured, it will potentially affect all the
> source operators.
>
> Thanks,
> Rong
>
> [1] https://www.ververica.com/blog/how-flink-handles-backpressure
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html
>
> On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay <Aj...@netapp.com>
> wrote:
>
>> I was wondering what is the impact if one of the stream operator function
>> occasionally takes too long to process the event.  Given the following
>> simple flink job
>>
>>
>>
>>        inputStream
>>
>>           .KeyBy (“tenantId”)
>>
>>           .process ( new MyKeyedProcessFunction())
>>
>>
>>
>> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
>> minutes) to process an incoming element, what is the impact on overall
>> pipeline? Is the impact limited to
>>
>>    1. Specific key for which MyKeyedProcessFunction is currently taking
>>    too long to process an element, or
>>    2. Specific Taskslot, where MyKeyedProcessFunction is currently
>>    taking too long to process an element, i.e. impacting multiple keys, or
>>    3. Entire inputstream ?
>>
>>
>>
>> Also what is the built in resiliency in these cases? Is there a concept
>> of timeout for each operator function?
>>
>>
>>
>> Ajay
>>
>

Re: Impact of occasional big pauses in stream processing

Posted by Rong Rong <wa...@gmail.com>.
Hi Ajay,

Flink handles "backpressure" in a graceful way so that it doesn't get
affected when your processing pipeline is occasionally slowed down.
I think the following articles will help [1,2].

In your specific case: the "KeyBy" operation will re-hash data so they can
be reshuffled from all input consumers to all your process operators (in
this case the MyKeyedProcessFunction). If one of the process operator is
backpressured, it will back track all the way to the source.
So, my understanding is that: since there's the reshuffling, if one of the
process function is backpressured, it will potentially affect all the
source operators.

Thanks,
Rong

[1] https://www.ververica.com/blog/how-flink-handles-backpressure
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/back_pressure.html

On Wed, Feb 13, 2019 at 8:50 AM Aggarwal, Ajay <Aj...@netapp.com>
wrote:

> I was wondering what is the impact if one of the stream operator function
> occasionally takes too long to process the event.  Given the following
> simple flink job
>
>
>
>        inputStream
>
>           .KeyBy (“tenantId”)
>
>           .process ( new MyKeyedProcessFunction())
>
>
>
> , if occasionally MyKeyedProcessFunction takes too long (say ~5-10
> minutes) to process an incoming element, what is the impact on overall
> pipeline? Is the impact limited to
>
>    1. Specific key for which MyKeyedProcessFunction is currently taking
>    too long to process an element, or
>    2. Specific Taskslot, where MyKeyedProcessFunction is currently taking
>    too long to process an element, i.e. impacting multiple keys, or
>    3. Entire inputstream ?
>
>
>
> Also what is the built in resiliency in these cases? Is there a concept of
> timeout for each operator function?
>
>
>
> Ajay
>