You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Giuliano Caliari <gi...@gmail.com> on 2017/02/28 02:19:41 UTC

Flink requesting external web service with rate limited requests

Hello,

I have an interesting problem that I'm having a hard time modeling on Flink,
I'm not sure if it's the right tool for the job. 

I have a stream of messages in Kafka that I need to group and send them to
an external web service but I have some concerns that need to be addressed: 

1. Rate Limited requests => Only tens of requests per minute. If the limit
is exceeded the system has to stop making requests for a few minutes.
2. Crash handling => I'm using savepoints

My first (naive) solution was to implement on a Sink function but the
requests may take a long time to return (up to minutes) so blocking the
thread will interfere with the savepoint mechanism (see  here
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Rate-limit-processing-td11174.html> 
). Because of this implementing the limit on the sink and relying on
backpressure to slow down the flow will get in the way of savepointing. I'm
not sure how big of a problem this will be but on my tests I'm reading
thousands of messages before the backpressure mechanism starts and
savepointing is taking around 20 minutes.

My second implementation was sleeping on the Fetcher for the Kafka Consumer
but the ws requests time have a huge variance so I ended up implementing a
communication channel between the sink and the source - an object with
mutable state. Not great.  

So my question is if there is a nice way to limit the flow of messages on
the system according to the rate given by a sink function? Is there any
other way I could make this work on Flink?

Thank you



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink requesting external web service with rate limited requests

Posted by Fabian Hueske <fh...@gmail.com>.
Hi,

I assume the problem with the slow savepoints is because the checkpoint
barriers which ensure the consistency of the savepoint get stuck between
the records which are buffered due to backpressure. At some point the
savepoint might get cancelled because it does not seem to make progress.
You can reduce the amount of data which is buffered due to backpressure by
reducing the number of network buffers (taskmanager.network.numberOfBuffers)
[1].
This will help the barriers to reach the operators faster.

I don't think there is a ready-to-go way to integrate Kafka offsets with a
webserver response.
You can of course always implement your own source function but that's a
bit of work.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.2/setup/config.html#jobmanager-amp-taskmanager

2017-03-01 0:58 GMT+01:00 Giuliano Caliari <gi...@gmail.com>:

> Hey Fabian,
>
> One of my solutions implements the AsyncFunction but I'm still unable to
> savepoint because Flink reads the backed up records, thousands of
> historical
> records, right off the bat and when I issue a savepoint request it has to
> wait for all those records to be processed which takes a couple of hours.
> So
> I'm still getting the error when savepointing.
> Alternatively I could wait for the backed up records to be processed and
> issue savepoints afterwards but there is a risk of failures and I would
> have
> to restart the whole process.
>
> Another idea would be if we could commit the Kafka offset only after we get
> a positive response from the external web service. There would be some
> duplication in case of errors but that's acceptable. Is there any easy way
> we can do this?
>
> Cheers,
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-
> service-with-rate-limited-requests-tp11952p11977.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>

Re: Flink requesting external web service with rate limited requests

Posted by Giuliano Caliari <gi...@gmail.com>.
Hey Fabian,

One of my solutions implements the AsyncFunction but I'm still unable to
savepoint because Flink reads the backed up records, thousands of historical
records, right off the bat and when I issue a savepoint request it has to
wait for all those records to be processed which takes a couple of hours. So
I'm still getting the error when savepointing. 
Alternatively I could wait for the backed up records to be processed and
issue savepoints afterwards but there is a risk of failures and I would have
to restart the whole process. 

Another idea would be if we could commit the Kafka offset only after we get
a positive response from the external web service. There would be some
duplication in case of errors but that's acceptable. Is there any easy way
we can do this?

Cheers,



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-service-with-rate-limited-requests-tp11952p11977.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Flink requesting external web service with rate limited requests

Posted by Fabian Hueske <fh...@gmail.com>.
A SourceFunction may only emit records when it holds the checkpointLock
(just as `ContinuousFileMonitoringFunction` does).
Flink only emits a checkpoint if it holds the lock. This ensures correct
behavior.

Best, Fabian


2017-02-28 10:58 GMT+01:00 Yassine MARZOUGUI <y....@mindlytix.com>:

> Hi Fabian,
>
> I have a related question regarding throttling at the source: If there is
> a sleep in the source as in ContinuousFileMonitoringFunction.java
> <https://github.com/ymarzougui/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L198>
> :
>
> while (isRunning) {
> synchronized (checkpointLock) {
> monitorDirAndForwardSplits(fileSystem, context);
> }
> Thread.sleep(interval);
> }
>
> Does it also block checkpoints?
> Thanks.
>
> Best,
> Yassine
>
> 2017-02-28 10:39 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>
>> Hi Giuliano,
>>
>> Flink 1.2 introduced the AsyncFunction which asynchronously sends
>> requests to external systems (k-v-stores, web services, etc.).
>> You can limit the number of concurrent requests, but AFAIK you cannot
>> specify a limit of requests per minute.
>> Maybe you can configure the function such that it works for your use case.
>>
>> Alternatively, you can take it as a blueprint for a custom operator
>> because handles watermarks and checkpoints correctly.
>>
>> I am not aware of a built-in mechanism to throttle a stream. You can do
>> it manually and simply sleep() in a MapFunction but that will also block
>> checkpoints.
>>
>> Best, Fabian
>>
>> 2017-02-28 3:19 GMT+01:00 Giuliano Caliari <gi...@gmail.com>:
>>
>>> Hello,
>>>
>>> I have an interesting problem that I'm having a hard time modeling on
>>> Flink,
>>> I'm not sure if it's the right tool for the job.
>>>
>>> I have a stream of messages in Kafka that I need to group and send them
>>> to
>>> an external web service but I have some concerns that need to be
>>> addressed:
>>>
>>> 1. Rate Limited requests => Only tens of requests per minute. If the
>>> limit
>>> is exceeded the system has to stop making requests for a few minutes.
>>> 2. Crash handling => I'm using savepoints
>>>
>>> My first (naive) solution was to implement on a Sink function but the
>>> requests may take a long time to return (up to minutes) so blocking the
>>> thread will interfere with the savepoint mechanism (see  here
>>> <http://apache-flink-user-mailing-list-archive.2336050.n4.na
>>> bble.com/Rate-limit-processing-td11174.html>
>>> ). Because of this implementing the limit on the sink and relying on
>>> backpressure to slow down the flow will get in the way of savepointing.
>>> I'm
>>> not sure how big of a problem this will be but on my tests I'm reading
>>> thousands of messages before the backpressure mechanism starts and
>>> savepointing is taking around 20 minutes.
>>>
>>> My second implementation was sleeping on the Fetcher for the Kafka
>>> Consumer
>>> but the ws requests time have a huge variance so I ended up implementing
>>> a
>>> communication channel between the sink and the source - an object with
>>> mutable state. Not great.
>>>
>>> So my question is if there is a nice way to limit the flow of messages on
>>> the system according to the rate given by a sink function? Is there any
>>> other way I could make this work on Flink?
>>>
>>> Thank you
>>>
>>>
>>>
>>> --
>>> View this message in context: http://apache-flink-user-maili
>>> ng-list-archive.2336050.n4.nabble.com/Flink-requesting-exter
>>> nal-web-service-with-rate-limited-requests-tp11952.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>
>>
>

Re: Flink requesting external web service with rate limited requests

Posted by Yassine MARZOUGUI <y....@mindlytix.com>.
Hi Fabian,

I have a related question regarding throttling at the source: If there is a
sleep in the source as in ContinuousFileMonitoringFunction.java
<https://github.com/ymarzougui/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L198>
:

while (isRunning) {
synchronized (checkpointLock) {
monitorDirAndForwardSplits(fileSystem, context);
}
Thread.sleep(interval);
}

Does it also block checkpoints?
Thanks.

Best,
Yassine

2017-02-28 10:39 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Hi Giuliano,
>
> Flink 1.2 introduced the AsyncFunction which asynchronously sends requests
> to external systems (k-v-stores, web services, etc.).
> You can limit the number of concurrent requests, but AFAIK you cannot
> specify a limit of requests per minute.
> Maybe you can configure the function such that it works for your use case.
>
> Alternatively, you can take it as a blueprint for a custom operator
> because handles watermarks and checkpoints correctly.
>
> I am not aware of a built-in mechanism to throttle a stream. You can do it
> manually and simply sleep() in a MapFunction but that will also block
> checkpoints.
>
> Best, Fabian
>
> 2017-02-28 3:19 GMT+01:00 Giuliano Caliari <gi...@gmail.com>:
>
>> Hello,
>>
>> I have an interesting problem that I'm having a hard time modeling on
>> Flink,
>> I'm not sure if it's the right tool for the job.
>>
>> I have a stream of messages in Kafka that I need to group and send them to
>> an external web service but I have some concerns that need to be
>> addressed:
>>
>> 1. Rate Limited requests => Only tens of requests per minute. If the limit
>> is exceeded the system has to stop making requests for a few minutes.
>> 2. Crash handling => I'm using savepoints
>>
>> My first (naive) solution was to implement on a Sink function but the
>> requests may take a long time to return (up to minutes) so blocking the
>> thread will interfere with the savepoint mechanism (see  here
>> <http://apache-flink-user-mailing-list-archive.2336050.n4.
>> nabble.com/Rate-limit-processing-td11174.html>
>> ). Because of this implementing the limit on the sink and relying on
>> backpressure to slow down the flow will get in the way of savepointing.
>> I'm
>> not sure how big of a problem this will be but on my tests I'm reading
>> thousands of messages before the backpressure mechanism starts and
>> savepointing is taking around 20 minutes.
>>
>> My second implementation was sleeping on the Fetcher for the Kafka
>> Consumer
>> but the ws requests time have a huge variance so I ended up implementing a
>> communication channel between the sink and the source - an object with
>> mutable state. Not great.
>>
>> So my question is if there is a nice way to limit the flow of messages on
>> the system according to the rate given by a sink function? Is there any
>> other way I could make this work on Flink?
>>
>> Thank you
>>
>>
>>
>> --
>> View this message in context: http://apache-flink-user-maili
>> ng-list-archive.2336050.n4.nabble.com/Flink-requesting-
>> external-web-service-with-rate-limited-requests-tp11952.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>
>

Re: Flink requesting external web service with rate limited requests

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Giuliano,

Flink 1.2 introduced the AsyncFunction which asynchronously sends requests
to external systems (k-v-stores, web services, etc.).
You can limit the number of concurrent requests, but AFAIK you cannot
specify a limit of requests per minute.
Maybe you can configure the function such that it works for your use case.

Alternatively, you can take it as a blueprint for a custom operator because
handles watermarks and checkpoints correctly.

I am not aware of a built-in mechanism to throttle a stream. You can do it
manually and simply sleep() in a MapFunction but that will also block
checkpoints.

Best, Fabian

2017-02-28 3:19 GMT+01:00 Giuliano Caliari <gi...@gmail.com>:

> Hello,
>
> I have an interesting problem that I'm having a hard time modeling on
> Flink,
> I'm not sure if it's the right tool for the job.
>
> I have a stream of messages in Kafka that I need to group and send them to
> an external web service but I have some concerns that need to be addressed:
>
> 1. Rate Limited requests => Only tens of requests per minute. If the limit
> is exceeded the system has to stop making requests for a few minutes.
> 2. Crash handling => I'm using savepoints
>
> My first (naive) solution was to implement on a Sink function but the
> requests may take a long time to return (up to minutes) so blocking the
> thread will interfere with the savepoint mechanism (see  here
> <http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Rate-limit-processing-td11174.html>
> ). Because of this implementing the limit on the sink and relying on
> backpressure to slow down the flow will get in the way of savepointing. I'm
> not sure how big of a problem this will be but on my tests I'm reading
> thousands of messages before the backpressure mechanism starts and
> savepointing is taking around 20 minutes.
>
> My second implementation was sleeping on the Fetcher for the Kafka Consumer
> but the ws requests time have a huge variance so I ended up implementing a
> communication channel between the sink and the source - an object with
> mutable state. Not great.
>
> So my question is if there is a nice way to limit the flow of messages on
> the system according to the rate given by a sink function? Is there any
> other way I could make this work on Flink?
>
> Thank you
>
>
>
> --
> View this message in context: http://apache-flink-user-
> mailing-list-archive.2336050.n4.nabble.com/Flink-requesting-external-web-
> service-with-rate-limited-requests-tp11952.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>