You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Balaji Rajagopalan <ba...@olacabs.com> on 2016/05/04 08:24:09 UTC

reading from latest kafka offset when flink starts

I am using the flink connector to read from a kafka stream, I ran into the
problem where the flink job went down due to some application error, it was
down for sometime, meanwhile the kafka queue was growing as expected no
consumer to consume from the given group , and when I started the flink it
started consuming the messages no problem so far, but consumer lag was huge
since producer is a fast producer about 4500 events/sec. My question is
there any flink connector configuration which can force it read from the
latest offset when the flink application starts since in my application
logic I do not care about older events.

balaji

Re: reading from latest kafka offset when flink starts

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
No I am using 0.8.0.2 kafka. I did some experiments with changing the
parallelism from 4 to 16 now the lag has reduced to 20 min from 2 hours,
the cpu utilization (load avg)  has gone up from 20-30 % to 50-60 % , so
parallelism does seem to play a role in reducing the processing lag in
flink as I expected.

On Wed, May 11, 2016 at 11:42 PM, Aljoscha Krettek <al...@apache.org>
wrote:

> Hi,
> are you per change using Kafka 0.9?
>
> Cheers,
> Aljoscha
>
> On Tue, 10 May 2016 at 08:37 Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> Robert,
>>   Regarding the event qps 4500 events/sec may not be large no, but I am
>> seeing some issue in processing the events due to processing power that I
>> am using, I have deployed flink app on 3 node yarn cluster one node is a
>> master, 2 slave nodes which has the taskmanager running. Each machine is a
>> 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find
>> there is delay of 2 hours from the time event enters into the system to
>> time it gets into the sink, it looks like the events are checkpointed but
>> processed with a huge delay inside flink, is there any recommendation(wiki
>> write up) for the no of taskmanager slots required for processing for
>> certain load of incoming data.
>>
>> balaji
>>
>> On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <uc...@apache.org> wrote:
>>
>>> Robert, what do you think about adding a note about this to the Kafka
>>> consumer docs? This has come up a couple of times on the mailing list
>>> already.
>>>
>>> – Ufuk
>>>
>>> On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
>>> <ba...@olacabs.com> wrote:
>>> > Thanks Robert appreciate your help.
>>> >
>>> > On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <rm...@apache.org>
>>> wrote:
>>> >>
>>> >> Hi,
>>> >>
>>> >> yes, you can use Kafka's configuration setting for that. Its called
>>> >> "auto.offset.reset". Setting it to "latest" will change the restart
>>> behavior
>>> >> to the current offset ("earliest" is the opposite).
>>> >>
>>> >> How heavy is the processing you are doing? 4500 events/second sounds
>>> not
>>> >> like a lot of throughput.
>>> >>
>>> >> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>>> >> <ba...@olacabs.com> wrote:
>>> >>>
>>> >>> I am using the flink connector to read from a kafka stream, I ran
>>> into
>>> >>> the problem where the flink job went down due to some application
>>> error, it
>>> >>> was down for sometime, meanwhile the kafka queue was growing as
>>> expected no
>>> >>> consumer to consume from the given group , and when I started the
>>> flink it
>>> >>> started consuming the messages no problem so far, but consumer lag
>>> was huge
>>> >>> since producer is a fast producer about 4500 events/sec. My question
>>> is
>>> >>> there any flink connector configuration which can force it read from
>>> the
>>> >>> latest offset when the flink application starts since in my
>>> application
>>> >>> logic I do not care about older events.
>>> >>>
>>> >>> balaji
>>> >>
>>> >>
>>> >
>>>
>>
>>

Re: reading from latest kafka offset when flink starts

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
are you per change using Kafka 0.9?

Cheers,
Aljoscha

On Tue, 10 May 2016 at 08:37 Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> Robert,
>   Regarding the event qps 4500 events/sec may not be large no, but I am
> seeing some issue in processing the events due to processing power that I
> am using, I have deployed flink app on 3 node yarn cluster one node is a
> master, 2 slave nodes which has the taskmanager running. Each machine is a
> 2 core machine with 4 gb ram,with default.parallelism set to 4,  I find
> there is delay of 2 hours from the time event enters into the system to
> time it gets into the sink, it looks like the events are checkpointed but
> processed with a huge delay inside flink, is there any recommendation(wiki
> write up) for the no of taskmanager slots required for processing for
> certain load of incoming data.
>
> balaji
>
> On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <uc...@apache.org> wrote:
>
>> Robert, what do you think about adding a note about this to the Kafka
>> consumer docs? This has come up a couple of times on the mailing list
>> already.
>>
>> – Ufuk
>>
>> On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
>> <ba...@olacabs.com> wrote:
>> > Thanks Robert appreciate your help.
>> >
>> > On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <rm...@apache.org>
>> wrote:
>> >>
>> >> Hi,
>> >>
>> >> yes, you can use Kafka's configuration setting for that. Its called
>> >> "auto.offset.reset". Setting it to "latest" will change the restart
>> behavior
>> >> to the current offset ("earliest" is the opposite).
>> >>
>> >> How heavy is the processing you are doing? 4500 events/second sounds
>> not
>> >> like a lot of throughput.
>> >>
>> >> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> >> <ba...@olacabs.com> wrote:
>> >>>
>> >>> I am using the flink connector to read from a kafka stream, I ran into
>> >>> the problem where the flink job went down due to some application
>> error, it
>> >>> was down for sometime, meanwhile the kafka queue was growing as
>> expected no
>> >>> consumer to consume from the given group , and when I started the
>> flink it
>> >>> started consuming the messages no problem so far, but consumer lag
>> was huge
>> >>> since producer is a fast producer about 4500 events/sec. My question
>> is
>> >>> there any flink connector configuration which can force it read from
>> the
>> >>> latest offset when the flink application starts since in my
>> application
>> >>> logic I do not care about older events.
>> >>>
>> >>> balaji
>> >>
>> >>
>> >
>>
>
>

Re: reading from latest kafka offset when flink starts

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Robert,
  Regarding the event qps 4500 events/sec may not be large no, but I am
seeing some issue in processing the events due to processing power that I
am using, I have deployed flink app on 3 node yarn cluster one node is a
master, 2 slave nodes which has the taskmanager running. Each machine is a
2 core machine with 4 gb ram,with default.parallelism set to 4,  I find
there is delay of 2 hours from the time event enters into the system to
time it gets into the sink, it looks like the events are checkpointed but
processed with a huge delay inside flink, is there any recommendation(wiki
write up) for the no of taskmanager slots required for processing for
certain load of incoming data.

balaji

On Mon, May 9, 2016 at 1:59 PM, Ufuk Celebi <uc...@apache.org> wrote:

> Robert, what do you think about adding a note about this to the Kafka
> consumer docs? This has come up a couple of times on the mailing list
> already.
>
> – Ufuk
>
> On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
> <ba...@olacabs.com> wrote:
> > Thanks Robert appreciate your help.
> >
> > On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <rm...@apache.org>
> wrote:
> >>
> >> Hi,
> >>
> >> yes, you can use Kafka's configuration setting for that. Its called
> >> "auto.offset.reset". Setting it to "latest" will change the restart
> behavior
> >> to the current offset ("earliest" is the opposite).
> >>
> >> How heavy is the processing you are doing? 4500 events/second sounds not
> >> like a lot of throughput.
> >>
> >> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
> >> <ba...@olacabs.com> wrote:
> >>>
> >>> I am using the flink connector to read from a kafka stream, I ran into
> >>> the problem where the flink job went down due to some application
> error, it
> >>> was down for sometime, meanwhile the kafka queue was growing as
> expected no
> >>> consumer to consume from the given group , and when I started the
> flink it
> >>> started consuming the messages no problem so far, but consumer lag was
> huge
> >>> since producer is a fast producer about 4500 events/sec. My question is
> >>> there any flink connector configuration which can force it read from
> the
> >>> latest offset when the flink application starts since in my application
> >>> logic I do not care about older events.
> >>>
> >>> balaji
> >>
> >>
> >
>

Re: reading from latest kafka offset when flink starts

Posted by Ufuk Celebi <uc...@apache.org>.
Robert, what do you think about adding a note about this to the Kafka
consumer docs? This has come up a couple of times on the mailing list
already.

– Ufuk

On Fri, May 6, 2016 at 12:07 PM, Balaji Rajagopalan
<ba...@olacabs.com> wrote:
> Thanks Robert appreciate your help.
>
> On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <rm...@apache.org> wrote:
>>
>> Hi,
>>
>> yes, you can use Kafka's configuration setting for that. Its called
>> "auto.offset.reset". Setting it to "latest" will change the restart behavior
>> to the current offset ("earliest" is the opposite).
>>
>> How heavy is the processing you are doing? 4500 events/second sounds not
>> like a lot of throughput.
>>
>> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan
>> <ba...@olacabs.com> wrote:
>>>
>>> I am using the flink connector to read from a kafka stream, I ran into
>>> the problem where the flink job went down due to some application error, it
>>> was down for sometime, meanwhile the kafka queue was growing as expected no
>>> consumer to consume from the given group , and when I started the flink it
>>> started consuming the messages no problem so far, but consumer lag was huge
>>> since producer is a fast producer about 4500 events/sec. My question is
>>> there any flink connector configuration which can force it read from the
>>> latest offset when the flink application starts since in my application
>>> logic I do not care about older events.
>>>
>>> balaji
>>
>>
>

Re: reading from latest kafka offset when flink starts

Posted by Balaji Rajagopalan <ba...@olacabs.com>.
Thanks Robert appreciate your help.

On Fri, May 6, 2016 at 3:07 PM, Robert Metzger <rm...@apache.org> wrote:

> Hi,
>
> yes, you can use Kafka's configuration setting for that. Its called
> "auto.offset.reset". Setting it to "latest" will change the restart
> behavior to the current offset ("earliest" is the opposite).
>
> How heavy is the processing you are doing? 4500 events/second sounds not
> like a lot of throughput.
>
> On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan <
> balaji.rajagopalan@olacabs.com> wrote:
>
>> I am using the flink connector to read from a kafka stream, I ran into
>> the problem where the flink job went down due to some application error, it
>> was down for sometime, meanwhile the kafka queue was growing as expected no
>> consumer to consume from the given group , and when I started the flink it
>> started consuming the messages no problem so far, but consumer lag was huge
>> since producer is a fast producer about 4500 events/sec. My question is
>> there any flink connector configuration which can force it read from the
>> latest offset when the flink application starts since in my application
>> logic I do not care about older events.
>>
>> balaji
>>
>
>

Re: reading from latest kafka offset when flink starts

Posted by Robert Metzger <rm...@apache.org>.
Hi,

yes, you can use Kafka's configuration setting for that. Its called
"auto.offset.reset". Setting it to "latest" will change the restart
behavior to the current offset ("earliest" is the opposite).

How heavy is the processing you are doing? 4500 events/second sounds not
like a lot of throughput.

On Wed, May 4, 2016 at 8:24 AM, Balaji Rajagopalan <
balaji.rajagopalan@olacabs.com> wrote:

> I am using the flink connector to read from a kafka stream, I ran into the
> problem where the flink job went down due to some application error, it was
> down for sometime, meanwhile the kafka queue was growing as expected no
> consumer to consume from the given group , and when I started the flink it
> started consuming the messages no problem so far, but consumer lag was huge
> since producer is a fast producer about 4500 events/sec. My question is
> there any flink connector configuration which can force it read from the
> latest offset when the flink application starts since in my application
> logic I do not care about older events.
>
> balaji
>