You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Florian König <fl...@micardo.com> on 2017/01/20 09:53:39 UTC

Rate-limit processing

Hi,

i need to limit the rate of processing in a Flink stream application. Specifically, the number of items processed in a .map() operation has to stay under a certain maximum per second.

At the moment, I have another .map() operation before the actual processing, which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / sec) and returns the item unchanged:

…

public T map(final T value) throws Exception {
	Thread.sleep(delay);
	return value;
}

…

This works as expected, but is a rather crude approach. Checkpointing the job takes a very long time: minutes for a state of a few kB, which for other jobs is done in a few milliseconds. I assume that letting the whole thread sleep for most of the time interferes with the checkpointing - not good!

Would using a different synchronization mechanism (e.g., https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) help to make checkpointing work better?

Or, preferably, is there a mechanism inside Flink that I can use to accomplish the desired rate limiting? I haven’t found anything in the docs.

Cheers,
Florian

Re: Rate-limit processing

Posted by Robert Metzger <rm...@apache.org>.
Hi Florian,

you can rate-limit the Kafka consumer by implementing a custom
DeserializationSchema that sleeps a bit from time to time (or at each
deserialization step)

On Tue, Jan 24, 2017 at 1:16 PM, Florian König <fl...@micardo.com>
wrote:

> Hi Till,
>
> thank you for the very helpful hints. You are right, I already see
> backpressure. In my case, that’s ok because it throttles the Kafka source.
> Speaking of which: You mentioned putting the rate limiting mechanism into
> the source. How can I do this with a Kafka source? Just extend the
> Producer, or is there a better mechanism to hook into the connector?
>
> Cheers,
> Florian
>
>
> > Am 20.01.2017 um 16:58 schrieb Till Rohrmann <tr...@apache.org>:
> >
> > Hi Florian,
> >
> > any blocking of the user code thread is in general a not so good idea
> because the checkpointing happens under the very same lock which also
> guards the user code invocation. Thus any checkpoint barrier arriving at
> the operator has only the chance to trigger the checkpointing once the
> blocking is over. Even worse, if the blocking happens in a downstream
> operator (not a source), then this blocking could cause backpressure. Since
> the checkpoint barriers flow with the events and are processed in order,
> the backpressure will then also influence the checkpointing time.
> >
> > So if you want to limit the rate, you should do it a the sources without
> blocking the source thread. You could for example count how many elements
> you've emitted in the past second and if it exceeds your maximum, then you
> don't emit the next element to downstream operators until some time has
> passed (this might end up in a busy loop but it allows the checkpointing to
> claim the lock).
> >
> > Cheers,
> > Till
> >
> > On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <
> y.marzougui@mindlytix.com> wrote:
> > Hi,
> >
> > You might find this similar thread from the mailing list archive helpful
> : http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/throttled-stream-td6138.html.
> >
> > Best,
> > Yassine
> >
> > 2017-01-20 10:53 GMT+01:00 Florian König <fl...@micardo.com>:
> > Hi,
> >
> > i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
> >
> > At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
> >
> > …
> >
> > public T map(final T value) throws Exception {
> >         Thread.sleep(delay);
> >         return value;
> > }
> >
> > …
> >
> > This works as expected, but is a rather crude approach. Checkpointing
> the job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
> >
> > Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
> >
> > Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
> >
> > Cheers,
> > Florian
> >
> >
>
>
>

Re: Rate-limit processing

Posted by Florian König <fl...@micardo.com>.
Hi Till,

thank you for the very helpful hints. You are right, I already see backpressure. In my case, that’s ok because it throttles the Kafka source. Speaking of which: You mentioned putting the rate limiting mechanism into the source. How can I do this with a Kafka source? Just extend the Producer, or is there a better mechanism to hook into the connector?

Cheers,
Florian


> Am 20.01.2017 um 16:58 schrieb Till Rohrmann <tr...@apache.org>:
> 
> Hi Florian,
> 
> any blocking of the user code thread is in general a not so good idea because the checkpointing happens under the very same lock which also guards the user code invocation. Thus any checkpoint barrier arriving at the operator has only the chance to trigger the checkpointing once the blocking is over. Even worse, if the blocking happens in a downstream operator (not a source), then this blocking could cause backpressure. Since the checkpoint barriers flow with the events and are processed in order, the backpressure will then also influence the checkpointing time.
> 
> So if you want to limit the rate, you should do it a the sources without blocking the source thread. You could for example count how many elements you've emitted in the past second and if it exceeds your maximum, then you don't emit the next element to downstream operators until some time has passed (this might end up in a busy loop but it allows the checkpointing to claim the lock).
> 
> Cheers,
> Till
> 
> On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <y....@mindlytix.com> wrote:
> Hi,
> 
> You might find this similar thread from the mailing list archive helpful : http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html.
> 
> Best,
> Yassine
> 
> 2017-01-20 10:53 GMT+01:00 Florian König <fl...@micardo.com>:
> Hi,
> 
> i need to limit the rate of processing in a Flink stream application. Specifically, the number of items processed in a .map() operation has to stay under a certain maximum per second.
> 
> At the moment, I have another .map() operation before the actual processing, which just sleeps for a certain time (e.g., 250ms for a limit of 4 requests / sec) and returns the item unchanged:
> 
> …
> 
> public T map(final T value) throws Exception {
>         Thread.sleep(delay);
>         return value;
> }
> 
> …
> 
> This works as expected, but is a rather crude approach. Checkpointing the job takes a very long time: minutes for a state of a few kB, which for other jobs is done in a few milliseconds. I assume that letting the whole thread sleep for most of the time interferes with the checkpointing - not good!
> 
> Would using a different synchronization mechanism (e.g., https://google.github.io/guava/releases/19.0/api/docs/index.html?com/google/common/util/concurrent/RateLimiter.html) help to make checkpointing work better?
> 
> Or, preferably, is there a mechanism inside Flink that I can use to accomplish the desired rate limiting? I haven’t found anything in the docs.
> 
> Cheers,
> Florian
> 
> 



Re: Rate-limit processing

Posted by Till Rohrmann <tr...@apache.org>.
Hi Florian,

any blocking of the user code thread is in general a not so good idea
because the checkpointing happens under the very same lock which also
guards the user code invocation. Thus any checkpoint barrier arriving at
the operator has only the chance to trigger the checkpointing once the
blocking is over. Even worse, if the blocking happens in a downstream
operator (not a source), then this blocking could cause backpressure. Since
the checkpoint barriers flow with the events and are processed in order,
the backpressure will then also influence the checkpointing time.

So if you want to limit the rate, you should do it a the sources without
blocking the source thread. You could for example count how many elements
you've emitted in the past second and if it exceeds your maximum, then you
don't emit the next element to downstream operators until some time has
passed (this might end up in a busy loop but it allows the checkpointing to
claim the lock).

Cheers,
Till

On Fri, Jan 20, 2017 at 12:18 PM, Yassine MARZOUGUI <
y.marzougui@mindlytix.com> wrote:

> Hi,
>
> You might find this similar thread from the mailing list archive helpful :
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/throttled-stream-td6138.html.
>
> Best,
> Yassine
>
> 2017-01-20 10:53 GMT+01:00 Florian König <fl...@micardo.com>:
>
>> Hi,
>>
>> i need to limit the rate of processing in a Flink stream application.
>> Specifically, the number of items processed in a .map() operation has to
>> stay under a certain maximum per second.
>>
>> At the moment, I have another .map() operation before the actual
>> processing, which just sleeps for a certain time (e.g., 250ms for a limit
>> of 4 requests / sec) and returns the item unchanged:
>>
>> …
>>
>> public T map(final T value) throws Exception {
>>         Thread.sleep(delay);
>>         return value;
>> }
>>
>> …
>>
>> This works as expected, but is a rather crude approach. Checkpointing the
>> job takes a very long time: minutes for a state of a few kB, which for
>> other jobs is done in a few milliseconds. I assume that letting the whole
>> thread sleep for most of the time interferes with the checkpointing - not
>> good!
>>
>> Would using a different synchronization mechanism (e.g.,
>> https://google.github.io/guava/releases/19.0/api/docs/index.
>> html?com/google/common/util/concurrent/RateLimiter.html) help to make
>> checkpointing work better?
>>
>> Or, preferably, is there a mechanism inside Flink that I can use to
>> accomplish the desired rate limiting? I haven’t found anything in the docs.
>>
>> Cheers,
>> Florian
>>
>
>

Re: Rate-limit processing

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi,

You might find this similar thread from the mailing list archive helpful :
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/throttled-stream-td6138.html
.

Best,
Yassine

2017-01-20 10:53 GMT+01:00 Florian König <fl...@micardo.com>:

> Hi,
>
> i need to limit the rate of processing in a Flink stream application.
> Specifically, the number of items processed in a .map() operation has to
> stay under a certain maximum per second.
>
> At the moment, I have another .map() operation before the actual
> processing, which just sleeps for a certain time (e.g., 250ms for a limit
> of 4 requests / sec) and returns the item unchanged:
>
> …
>
> public T map(final T value) throws Exception {
>         Thread.sleep(delay);
>         return value;
> }
>
> …
>
> This works as expected, but is a rather crude approach. Checkpointing the
> job takes a very long time: minutes for a state of a few kB, which for
> other jobs is done in a few milliseconds. I assume that letting the whole
> thread sleep for most of the time interferes with the checkpointing - not
> good!
>
> Would using a different synchronization mechanism (e.g.,
> https://google.github.io/guava/releases/19.0/api/docs/
> index.html?com/google/common/util/concurrent/RateLimiter.html) help to
> make checkpointing work better?
>
> Or, preferably, is there a mechanism inside Flink that I can use to
> accomplish the desired rate limiting? I haven’t found anything in the docs.
>
> Cheers,
> Florian
>