You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Akshay Aggarwal <ak...@flipkart.com> on 2020/02/07 12:36:27 UTC

Backfilling From Kafka Topics With Varied Ingestion Rates

Hi Flink Users,

We have a scenario where we're reading from multiple kafka topics using a
single kafka consumer. Each topic has a very different ingestion rate, like
CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are
performing ordering of these events across topics using a keyed process
function (keyed on userId) and a EVENT_TIME watermark which is based on the
ingestionTs of the record captured just before it is produced into kafka.

On live data this pipeline works perfectly, but if I restart the job to
process from an old savepoint (say 24hrs old), the job fills up the state,
a full back pressure (ratio 1) gets created on the source operators,
checkpoints start failing and the job eventually dies. My hypothesis is
that the data from both the topics are read at the max rate possible, but
since the watermark from the PageViewTopic will lag significantly behind
the CheckoutTopic overall watermarks don't progress, excessive data
from CheckoutTopic fills up the state and results in the failure mentioned
above.

I also observed this while backfilling from a savepoint using a single
topic, even though watermarks do progress faster than before, the job has
the same fate. In this case I'm assuming the offsets/watermarks of the
individual partitions go out-of-sync with respect to time leading to a
similar situation mentioned above.

Is this understanding correct? is there a known solution for this? And if
not, what is the suggested approach to tackle this problem?

Thanks & Regards,
Akshay Aggarwal

-- 



*-----------------------------------------------------------------------------------------*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*****

 ****

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*****

 ****

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-----------------------------------------------------------------------------------------_


Re: Backfilling From Kafka Topics With Varied Ingestion Rates

Posted by Robert Metzger <rm...@apache.org>.
You can find more information here:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

On Wed, Feb 12, 2020 at 11:30 AM Akshay Aggarwal <
akshay.aggarwal@flipkart.com> wrote:

> Thanks Aljoscha. Is there a JIRA where this is getting tracked?
>
> ~Akshay
>
> On Wed, Feb 12, 2020 at 1:56 PM Aljoscha Krettek <al...@apache.org>
> wrote:
>
>> Hi,
>>
>> I'm afraid your analysis is 100% correct. Currently there's no
>> out-of-box feature for dealing with this but our work on a new source
>> interface ([1]) will enable us to add a feature that we call "event-time
>> alignment" where source readers would slow down reading from certain
>> source partitions if their watermark advances to far beyond the minimum
>> watermark over all source partitions.
>>
>> Best,
>> Aljoscha
>>
>> On 07.02.20 13:36, Akshay Aggarwal wrote:
>> > Hi Flink Users,
>> >
>> > We have a scenario where we're reading from multiple kafka topics using
>> a
>> > single kafka consumer. Each topic has a very different ingestion rate,
>> like
>> > CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are
>> > performing ordering of these events across topics using a keyed process
>> > function (keyed on userId) and a EVENT_TIME watermark which is based on
>> the
>> > ingestionTs of the record captured just before it is produced into
>> kafka.
>> >
>> > On live data this pipeline works perfectly, but if I restart the job to
>> > process from an old savepoint (say 24hrs old), the job fills up the
>> state,
>> > a full back pressure (ratio 1) gets created on the source operators,
>> > checkpoints start failing and the job eventually dies. My hypothesis is
>> > that the data from both the topics are read at the max rate possible,
>> but
>> > since the watermark from the PageViewTopic will lag significantly behind
>> > the CheckoutTopic overall watermarks don't progress, excessive data
>> > from CheckoutTopic fills up the state and results in the failure
>> mentioned
>> > above.
>> >
>> > I also observed this while backfilling from a savepoint using a single
>> > topic, even though watermarks do progress faster than before, the job
>> has
>> > the same fate. In this case I'm assuming the offsets/watermarks of the
>> > individual partitions go out-of-sync with respect to time leading to a
>> > similar situation mentioned above.
>> >
>> > Is this understanding correct? is there a known solution for this? And
>> if
>> > not, what is the suggested approach to tackle this problem?
>> >
>> > Thanks & Regards,
>> > Akshay Aggarwal
>> >
>>
>
>
> *-----------------------------------------------------------------------------------------*
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. If you have received this email in error, please notify the
> system manager. This message contains confidential information and is
> intended only for the individual named. If you are not the named addressee,
> you should not disseminate, distribute or copy this email. Please notify
> the sender immediately by email if you have received this email by mistake
> and delete this email from your system. If you are not the intended
> recipient, you are notified that disclosing, copying, distributing or
> taking any action in reliance on the contents of this information is
> strictly prohibited.*
>
>
>
> *Any views or opinions presented in this email are solely those of the
> author and do not necessarily represent those of the organization. Any
> information on shares, debentures or similar instruments, recommended
> product pricing, valuations and the like are for information purposes only.
> It is not meant to be an instruction or recommendation, as the case may be,
> to buy or to sell securities, products, services nor an offer to buy or
> sell securities, products or services unless specifically stated to be so
> on behalf of the Flipkart group. Employees of the Flipkart group of
> companies are expressly required not to make defamatory statements and not
> to infringe or authorise any infringement of copyright or any other legal
> right by email communications. Any such communication is contrary to
> organizational policy and outside the scope of the employment of the
> individual concerned. The organization will not accept any liability in
> respect of such communication, and the employee responsible will be
> personally liable for any damages or other liability arising.*
>
>
>
> *Our organization accepts no liability for the content of this email, or
> for the consequences of any actions taken on the basis of the information *
> provided,* unless that information is subsequently confirmed in writing.
> If you are not the intended recipient, you are notified that disclosing,
> copying, distributing or taking any action in reliance on the contents of
> this information is strictly prohibited.*
>
>
> *-----------------------------------------------------------------------------------------*
>
>

Re: Backfilling From Kafka Topics With Varied Ingestion Rates

Posted by Akshay Aggarwal <ak...@flipkart.com>.
Thanks Aljoscha. Is there a JIRA where this is getting tracked?

~Akshay

On Wed, Feb 12, 2020 at 1:56 PM Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
>
> I'm afraid your analysis is 100% correct. Currently there's no
> out-of-box feature for dealing with this but our work on a new source
> interface ([1]) will enable us to add a feature that we call "event-time
> alignment" where source readers would slow down reading from certain
> source partitions if their watermark advances to far beyond the minimum
> watermark over all source partitions.
>
> Best,
> Aljoscha
>
> On 07.02.20 13:36, Akshay Aggarwal wrote:
> > Hi Flink Users,
> >
> > We have a scenario where we're reading from multiple kafka topics using a
> > single kafka consumer. Each topic has a very different ingestion rate,
> like
> > CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are
> > performing ordering of these events across topics using a keyed process
> > function (keyed on userId) and a EVENT_TIME watermark which is based on
> the
> > ingestionTs of the record captured just before it is produced into kafka.
> >
> > On live data this pipeline works perfectly, but if I restart the job to
> > process from an old savepoint (say 24hrs old), the job fills up the
> state,
> > a full back pressure (ratio 1) gets created on the source operators,
> > checkpoints start failing and the job eventually dies. My hypothesis is
> > that the data from both the topics are read at the max rate possible, but
> > since the watermark from the PageViewTopic will lag significantly behind
> > the CheckoutTopic overall watermarks don't progress, excessive data
> > from CheckoutTopic fills up the state and results in the failure
> mentioned
> > above.
> >
> > I also observed this while backfilling from a savepoint using a single
> > topic, even though watermarks do progress faster than before, the job has
> > the same fate. In this case I'm assuming the offsets/watermarks of the
> > individual partitions go out-of-sync with respect to time leading to a
> > similar situation mentioned above.
> >
> > Is this understanding correct? is there a known solution for this? And if
> > not, what is the suggested approach to tackle this problem?
> >
> > Thanks & Regards,
> > Akshay Aggarwal
> >
>

-- 



*-----------------------------------------------------------------------------------------*


*This email and any files transmitted with it are confidential and 
intended solely for the use of the individual or entity to whom they are 
addressed. If you have received this email in error, please notify the 
system manager. This message contains confidential information and is 
intended only for the individual named. If you are not the named addressee, 
you should not disseminate, distribute or copy this email. Please notify 
the sender immediately by email if you have received this email by mistake 
and delete this email from your system. If you are not the intended 
recipient, you are notified that disclosing, copying, distributing or 
taking any action in reliance on the contents of this information is 
strictly prohibited.*****

 ****

*Any views or opinions presented in this 
email are solely those of the author and do not necessarily represent those 
of the organization. Any information on shares, debentures or similar 
instruments, recommended product pricing, valuations and the like are for 
information purposes only. It is not meant to be an instruction or 
recommendation, as the case may be, to buy or to sell securities, products, 
services nor an offer to buy or sell securities, products or services 
unless specifically stated to be so on behalf of the Flipkart group. 
Employees of the Flipkart group of companies are expressly required not to 
make defamatory statements and not to infringe or authorise any 
infringement of copyright or any other legal right by email communications. 
Any such communication is contrary to organizational policy and outside the 
scope of the employment of the individual concerned. The organization will 
not accept any liability in respect of such communication, and the employee 
responsible will be personally liable for any damages or other liability 
arising.*****

 ****

*Our organization accepts no liability for the 
content of this email, or for the consequences of any actions taken on the 
basis of the information *provided,* unless that information is 
subsequently confirmed in writing. If you are not the intended recipient, 
you are notified that disclosing, copying, distributing or taking any 
action in reliance on the contents of this information is strictly 
prohibited.*


_-----------------------------------------------------------------------------------------_


Re: Backfilling From Kafka Topics With Varied Ingestion Rates

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,

I'm afraid your analysis is 100% correct. Currently there's no 
out-of-box feature for dealing with this but our work on a new source 
interface ([1]) will enable us to add a feature that we call "event-time 
alignment" where source readers would slow down reading from certain 
source partitions if their watermark advances to far beyond the minimum 
watermark over all source partitions.

Best,
Aljoscha

On 07.02.20 13:36, Akshay Aggarwal wrote:
> Hi Flink Users,
> 
> We have a scenario where we're reading from multiple kafka topics using a
> single kafka consumer. Each topic has a very different ingestion rate, like
> CheckoutTopic has 500 rec/sec, PageViewTopic has 10,000 rec/sec. We are
> performing ordering of these events across topics using a keyed process
> function (keyed on userId) and a EVENT_TIME watermark which is based on the
> ingestionTs of the record captured just before it is produced into kafka.
> 
> On live data this pipeline works perfectly, but if I restart the job to
> process from an old savepoint (say 24hrs old), the job fills up the state,
> a full back pressure (ratio 1) gets created on the source operators,
> checkpoints start failing and the job eventually dies. My hypothesis is
> that the data from both the topics are read at the max rate possible, but
> since the watermark from the PageViewTopic will lag significantly behind
> the CheckoutTopic overall watermarks don't progress, excessive data
> from CheckoutTopic fills up the state and results in the failure mentioned
> above.
> 
> I also observed this while backfilling from a savepoint using a single
> topic, even though watermarks do progress faster than before, the job has
> the same fate. In this case I'm assuming the offsets/watermarks of the
> individual partitions go out-of-sync with respect to time leading to a
> similar situation mentioned above.
> 
> Is this understanding correct? is there a known solution for this? And if
> not, what is the suggested approach to tackle this problem?
> 
> Thanks & Regards,
> Akshay Aggarwal
>