You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Gyula Fóra <gy...@apache.org> on 2017/03/17 09:26:13 UTC

Telling if a job has caught up with Kafka

Hi All,

I am wondering if anyone has some nice suggestions on what would be the
simplest/best way of telling if a job is caught up with the Kafka input.
An alternative question would be how to tell if a job is caught up to
another job reading from the same topic.

The first thing that comes to my mind is looking at the offsets Flink
commits to Kafka. However this will only work if every job uses a different
group id and even then it is not very reliable depending on the commit
frequency.

The use case I am trying to solve is fault tolerant update of a job, by
taking a savepoint for job1 starting job2 from the savepoint, waiting until
it catches up and then killing job1.

Thanks for your input!
Gyula

Re: Telling if a job has caught up with Kafka

Posted by aitozi <gj...@gmail.com>.
Hi, rmetzger0

Sorry to reply to this old question, i found that we use the kafka client
0.9 in class kafkaThread which lead to the lose of many other detail metrics
add in kafka client 10 like per partition consumer lag mentioned by this
issuse https://issues.apache.org/jira/browse/FLINK-7945.  i will try to add
the detail metrics 


rmetzger0 wrote
> Sorry for joining this discussion late, but there is already a metric for
> the offset lag in our 0.9+ consumers.
> Its called the "records-lag-max":
> https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and
> its exposed via Flink's metrics system.
> The only issue is that it only shows the maximum lag across all
> partitions,
> not detailed per-partition metrics.





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Telling if a job has caught up with Kafka

Posted by Robert Metzger <rm...@apache.org>.
Sorry for joining this discussion late, but there is already a metric for
the offset lag in our 0.9+ consumers.
Its called the "records-lag-max":
https://kafka.apache.org/documentation/#new_consumer_fetch_monitoring and
its exposed via Flink's metrics system.
The only issue is that it only shows the maximum lag across all partitions,
not detailed per-partition metrics.

On Mon, Mar 20, 2017 at 3:43 PM, Bruno Aranda <br...@gmail.com> wrote:

> Hi,
>
> Thanks! The proposal sounds very good to us too.
>
> Bruno
>
> On Sun, 19 Mar 2017 at 10:57 Florian König <fl...@micardo.com>
> wrote:
>
>> Thanks Gordon for the detailed explanation! That makes sense and explains
>> the expected behaviour.
>>
>> The JIRA for the new metric also sounds very good. Can’t wait to have
>> this in the Flink GUI (KafkaOffsetMonitor has some problems and stops
>> working after 1-2 days, don’t know the reason yet).
>>
>> All the best,
>> Florian
>>
>>
>> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tzulitai@apache.org
>> >:
>> >
>> > @Florian
>> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
>> now for the offset committing.
>> >
>> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
>> etc. settings will be completely ignored and overwritten before used to
>> instantiate the interval Kafka clients, hence committing will only happen
>> on Flink checkpoints.
>> >
>> > In 0.8, this isn’t the case. Both automatic periodic committing and
>> committing on checkpoints can take place. That’s perhaps why you’re
>> observing the 0.8 consumer to be committing more frequently.
>> >
>> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
>> interested, you can take a look at https://github.com/apache/
>> flink/pull/3527.
>> >
>> > - Gordon
>> >
>> >
>> > On March 17, 2017 at 6:07:38 PM, Florian König (
>> florian.koenig@micardo.com) wrote:
>> >
>> >> Why is that so? The checkpoint contains the Kafka offset and would be
>> able to start reading wherever it left off, regardless of any offset stored
>> in Kafka or Zookeeper. Why is the offset not committed regularly,
>> independently from the checkpointing? Or did I misconfigure anything?
>>
>>
>>

Re: Telling if a job has caught up with Kafka

Posted by Bruno Aranda <br...@gmail.com>.
Hi,

Thanks! The proposal sounds very good to us too.

Bruno

On Sun, 19 Mar 2017 at 10:57 Florian König <fl...@micardo.com>
wrote:

> Thanks Gordon for the detailed explanation! That makes sense and explains
> the expected behaviour.
>
> The JIRA for the new metric also sounds very good. Can’t wait to have this
> in the Flink GUI (KafkaOffsetMonitor has some problems and stops working
> after 1-2 days, don’t know the reason yet).
>
> All the best,
> Florian
>
>
> > Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tzulitai@apache.org
> >:
> >
> > @Florian
> > the 0.9 / 0.10 version and 0.8 version behave a bit differently right
> now for the offset committing.
> >
> > In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable”
> etc. settings will be completely ignored and overwritten before used to
> instantiate the interval Kafka clients, hence committing will only happen
> on Flink checkpoints.
> >
> > In 0.8, this isn’t the case. Both automatic periodic committing and
> committing on checkpoints can take place. That’s perhaps why you’re
> observing the 0.8 consumer to be committing more frequently.
> >
> > FYI: This behaviour will be unified in Flink 1.3.0. If you’re
> interested, you can take a look at
> https://github.com/apache/flink/pull/3527.
> >
> > - Gordon
> >
> >
> > On March 17, 2017 at 6:07:38 PM, Florian König (
> florian.koenig@micardo.com) wrote:
> >
> >> Why is that so? The checkpoint contains the Kafka offset and would be
> able to start reading wherever it left off, regardless of any offset stored
> in Kafka or Zookeeper. Why is the offset not committed regularly,
> independently from the checkpointing? Or did I misconfigure anything?
>
>
>

Re: Telling if a job has caught up with Kafka

Posted by Florian König <fl...@micardo.com>.
Thanks Gordon for the detailed explanation! That makes sense and explains the expected behaviour.

The JIRA for the new metric also sounds very good. Can’t wait to have this in the Flink GUI (KafkaOffsetMonitor has some problems and stops working after 1-2 days, don’t know the reason yet).

All the best,
Florian


> Am 18.03.2017 um 08:38 schrieb Tzu-Li (Gordon) Tai <tz...@apache.org>:
> 
> @Florian
> the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for the offset committing.
> 
> In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. settings will be completely ignored and overwritten before used to instantiate the interval Kafka clients, hence committing will only happen on Flink checkpoints.
> 
> In 0.8, this isn’t the case. Both automatic periodic committing and committing on checkpoints can take place. That’s perhaps why you’re observing the 0.8 consumer to be committing more frequently.
> 
> FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you can take a look at https://github.com/apache/flink/pull/3527.
> 
> - Gordon
> 
> 
> On March 17, 2017 at 6:07:38 PM, Florian König (florian.koenig@micardo.com) wrote:
> 
>> Why is that so? The checkpoint contains the Kafka offset and would be able to start reading wherever it left off, regardless of any offset stored in Kafka or Zookeeper. Why is the offset not committed regularly, independently from the checkpointing? Or did I misconfigure anything?



Re: Telling if a job has caught up with Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
@Florian
the 0.9 / 0.10 version and 0.8 version behave a bit differently right now for the offset committing.

In 0.9 / 0.10, if checkpointing is enabled, the “auto.commit.enable” etc. settings will be completely ignored and overwritten before used to instantiate the interval Kafka clients, hence committing will only happen on Flink checkpoints.

In 0.8, this isn’t the case. Both automatic periodic committing and committing on checkpoints can take place. That’s perhaps why you’re observing the 0.8 consumer to be committing more frequently.

FYI: This behaviour will be unified in Flink 1.3.0. If you’re interested, you can take a look at https://github.com/apache/flink/pull/3527.

- Gordon


On March 17, 2017 at 6:07:38 PM, Florian König (florian.koenig@micardo.com) wrote:

Why is that so? The checkpoint contains the Kafka offset and would be able to start reading wherever it left off, regardless of any offset stored in Kafka or Zookeeper. Why is the offset not committed regularly, independently from the checkpointing? Or did I misconfigure anything? 

Re: Telling if a job has caught up with Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
Thanks Gordon! :)

Gyula

On Sat, Mar 18, 2017, 08:26 Tzu-Li (Gordon) Tai <tz...@apache.org> wrote:

So we would have current lag per partition (for instance every 1 sec) and
lag at the latest checkpoint per partition in an easily queryable way.

I quite like this idea! We could perhaps call them “currentOffsetLag” and
“lastCheckpointedOffsetLag”.
I’ve filed a JIRA to track this feature, and added some details there too:
https://issues.apache.org/jira/browse/FLINK-6109.

Cheers,
Gordon


On March 17, 2017 at 9:53:43 PM, Gyula Fóra (gyula.fora@gmail.com) wrote:

Hi Gordon,

Thanks for the suggestions, I think in general it would be good to make
this periodic (with a configurable interval), and also show the latest
committed (checkpointed) offset lag.
I think it's better to show both not only one of them as they both carry
useful information.

So we would have current lag per partition (for instance every 1 sec) and
lag at the latest checkpoint per partition in an easily queryable way.

Gyula

Tzu-Li (Gordon) Tai <tz...@apache.org> ezt írta (időpont: 2017. márc.
17., P, 14:24):

One other possibility for reporting “consumer lag” is to update the metric
only at a
configurable interval, if use cases can tolerate a certain delay in
realizing the consumer
has caught up.

Or we could also piggy pack the consumer lag update onto the checkpoint
interval -
I think in the case that Gyula described, users might additionally want
to stop the old job only when the new job has “caught up with partition
head” &&
“the offsets used to determine the lag is secured in a checkpoint”. That
should
address syncing the consumer lag calculation with the commit frequency
discussed here.

What do you think?


On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org)
wrote:

Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (uce@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com>
wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps
in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka
consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly
than
>> the checkpointing interval. With the 10 consumer a commit is only made
after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be
able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly,
independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka
input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a
different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting
until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>

Re: Telling if a job has caught up with Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
So we would have current lag per partition (for instance every 1 sec) and lag at the latest checkpoint per partition in an easily queryable way.
I quite like this idea! We could perhaps call them “currentOffsetLag” and “lastCheckpointedOffsetLag”.

I’ve filed a JIRA to track this feature, and added some details there too: https://issues.apache.org/jira/browse/FLINK-6109.

Cheers,
Gordon

On March 17, 2017 at 9:53:43 PM, Gyula Fóra (gyula.fora@gmail.com) wrote:

Hi Gordon,

Thanks for the suggestions, I think in general it would be good to make this periodic (with a configurable interval), and also show the latest committed (checkpointed) offset lag.
I think it's better to show both not only one of them as they both carry useful information.

So we would have current lag per partition (for instance every 1 sec) and lag at the latest checkpoint per partition in an easily queryable way.

Gyula

Tzu-Li (Gordon) Tai <tz...@apache.org> ezt írta (időpont: 2017. márc. 17., P, 14:24):
One other possibility for reporting “consumer lag” is to update the metric only at a
configurable interval, if use cases can tolerate a certain delay in realizing the consumer
has caught up.

Or we could also piggy pack the consumer lag update onto the checkpoint interval -
I think in the case that Gyula described, users might additionally want
to stop the old job only when the new job has “caught up with partition head” &&
“the offsets used to determine the lag is secured in a checkpoint”. That should
address syncing the consumer lag calculation with the commit frequency discussed here.

What do you think?


On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:

Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (uce@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com> wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>

Re: Telling if a job has caught up with Kafka

Posted by Gyula Fóra <gy...@gmail.com>.
Hi Gordon,

Thanks for the suggestions, I think in general it would be good to make
this periodic (with a configurable interval), and also show the latest
committed (checkpointed) offset lag.
I think it's better to show both not only one of them as they both carry
useful information.

So we would have current lag per partition (for instance every 1 sec) and
lag at the latest checkpoint per partition in an easily queryable way.

Gyula

Tzu-Li (Gordon) Tai <tz...@apache.org> ezt írta (időpont: 2017. márc.
17., P, 14:24):

> One other possibility for reporting “consumer lag” is to update the metric
> only at a
> configurable interval, if use cases can tolerate a certain delay in
> realizing the consumer
> has caught up.
>
> Or we could also piggy pack the consumer lag update onto the checkpoint
> interval -
> I think in the case that Gyula described, users might additionally want
> to stop the old job only when the new job has “caught up with partition
> head” &&
> “the offsets used to determine the lag is secured in a checkpoint”. That
> should
> address syncing the consumer lag calculation with the commit frequency
> discussed here.
>
> What do you think?
>
>
> On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org)
> wrote:
>
> Hi,
>
> I was thinking somewhat similar to what Ufuk suggested,
> but if we want to report a “consumer lag” metric, we would
> essentially need to request the latest offset on every record fetch
> (because the latest offset advances as well), so I wasn’t so sure
> of the performance tradeoffs there (the partition metadata request
> and records requests require 2 separate calls, so we would
> basically be doubling the requests calls to Kafka just for this).
>
> If we just want a metric that can show whether or not the
> consumer has caught up with the “latest offset at the time the
> consumer starts”, it would definitely be feasible. I wonder
> how we want to name this metric though.
> @Gyula @Florian @Bruno do you think this is enough for your needs?
>
> - Gordon
>
> On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (uce@apache.org) wrote:
>
> @Gordon: What's your take on integrating this directly into the
> consumer? Can't we poll the latest offset wie the Offset API [1] and
> report a consumer lag metric for the consumer group of the
> application? This we could also display in the web frontend.
>
> In the first version, users would have to poll this metric manually.
>
> [1]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest
>
> On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com>
> wrote:
> > Hi,
> >
> > We are interested on this too. So far we flag the records with
> timestamps in
> > different points of the pipeline and use metrics gauges to measure
> latency
> > between the different components, but would be good to know if there is
> > something more specific to Kafka that we can do out of the box in Flink.
> >
> > Cheers,
> >
> > Bruno
> >
> > On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
> > wrote:
> >>
> >> Hi,
> >>
> >> thank you Gyula for posting that question. I’d also be interested in how
> >> this could be done.
> >>
> >> You mentioned the dependency on the commit frequency. I’m using
> >> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka
> consumer
> >> a job's offsets as shown in the diagrams updated a lot more regularly
> than
> >> the checkpointing interval. With the 10 consumer a commit is only made
> after
> >> a successful checkpoint (or so it seems).
> >>
> >> Why is that so? The checkpoint contains the Kafka offset and would be
> able
> >> to start reading wherever it left off, regardless of any offset stored
> in
> >> Kafka or Zookeeper. Why is the offset not committed regularly,
> independently
> >> from the checkpointing? Or did I misconfigure anything?
> >>
> >> Thanks
> >> Florian
> >>
> >> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
> >> >
> >> > Hi All,
> >> >
> >> > I am wondering if anyone has some nice suggestions on what would be
> the
> >> > simplest/best way of telling if a job is caught up with the Kafka
> input.
> >> > An alternative question would be how to tell if a job is caught up to
> >> > another job reading from the same topic.
> >> >
> >> > The first thing that comes to my mind is looking at the offsets Flink
> >> > commits to Kafka. However this will only work if every job uses a
> different
> >> > group id and even then it is not very reliable depending on the commit
> >> > frequency.
> >> >
> >> > The use case I am trying to solve is fault tolerant update of a job,
> by
> >> > taking a savepoint for job1 starting job2 from the savepoint, waiting
> until
> >> > it catches up and then killing job1.
> >> >
> >> > Thanks for your input!
> >> > Gyula
> >>
> >>
> >
>
>

Re: Telling if a job has caught up with Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
One other possibility for reporting “consumer lag” is to update the metric only at a
configurable interval, if use cases can tolerate a certain delay in realizing the consumer
has caught up.

Or we could also piggy pack the consumer lag update onto the checkpoint interval -
I think in the case that Gyula described, users might additionally want
to stop the old job only when the new job has “caught up with partition head” &&
“the offsets used to determine the lag is secured in a checkpoint”. That should
address syncing the consumer lag calculation with the commit frequency discussed here.

What do you think?

On March 17, 2017 at 9:05:04 PM, Tzu-Li (Gordon) Tai (tzulitai@apache.org) wrote:

Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (uce@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com> wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>

Re: Telling if a job has caught up with Kafka

Posted by "Tzu-Li (Gordon) Tai" <tz...@apache.org>.
Hi,

I was thinking somewhat similar to what Ufuk suggested,
but if we want to report a “consumer lag” metric, we would
essentially need to request the latest offset on every record fetch
(because the latest offset advances as well), so I wasn’t so sure
of the performance tradeoffs there (the partition metadata request
and records requests require 2 separate calls, so we would
basically be doubling the requests calls to Kafka just for this).

If we just want a metric that can show whether or not the
consumer has caught up with the “latest offset at the time the
consumer starts”, it would definitely be feasible. I wonder
how we want to name this metric though.
@Gyula @Florian @Bruno do you think this is enough for your needs?

- Gordon

On March 17, 2017 at 8:51:49 PM, Ufuk Celebi (uce@apache.org) wrote:

@Gordon: What's your take on integrating this directly into the  
consumer? Can't we poll the latest offset wie the Offset API [1] and  
report a consumer lag metric for the consumer group of the  
application? This we could also display in the web frontend.  

In the first version, users would have to poll this metric manually.  

[1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest  

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com> wrote:  
> Hi,  
>  
> We are interested on this too. So far we flag the records with timestamps in  
> different points of the pipeline and use metrics gauges to measure latency  
> between the different components, but would be good to know if there is  
> something more specific to Kafka that we can do out of the box in Flink.  
>  
> Cheers,  
>  
> Bruno  
>  
> On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>  
> wrote:  
>>  
>> Hi,  
>>  
>> thank you Gyula for posting that question. I’d also be interested in how  
>> this could be done.  
>>  
>> You mentioned the dependency on the commit frequency. I’m using  
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer  
>> a job's offsets as shown in the diagrams updated a lot more regularly than  
>> the checkpointing interval. With the 10 consumer a commit is only made after  
>> a successful checkpoint (or so it seems).  
>>  
>> Why is that so? The checkpoint contains the Kafka offset and would be able  
>> to start reading wherever it left off, regardless of any offset stored in  
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently  
>> from the checkpointing? Or did I misconfigure anything?  
>>  
>> Thanks  
>> Florian  
>>  
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:  
>> >  
>> > Hi All,  
>> >  
>> > I am wondering if anyone has some nice suggestions on what would be the  
>> > simplest/best way of telling if a job is caught up with the Kafka input.  
>> > An alternative question would be how to tell if a job is caught up to  
>> > another job reading from the same topic.  
>> >  
>> > The first thing that comes to my mind is looking at the offsets Flink  
>> > commits to Kafka. However this will only work if every job uses a different  
>> > group id and even then it is not very reliable depending on the commit  
>> > frequency.  
>> >  
>> > The use case I am trying to solve is fault tolerant update of a job, by  
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until  
>> > it catches up and then killing job1.  
>> >  
>> > Thanks for your input!  
>> > Gyula  
>>  
>>  
>  

Re: Telling if a job has caught up with Kafka

Posted by Ufuk Celebi <uc...@apache.org>.
@Gordon: What's your take on integrating this directly into the
consumer? Can't we poll the latest offset wie the Offset API [1] and
report a consumer lag metric for the consumer group of the
application? This we could also display in the web frontend.

In the first version, users would have to poll this metric manually.

[1] https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-TopicMetadataRequest

On Fri, Mar 17, 2017 at 11:23 AM, Bruno Aranda <br...@gmail.com> wrote:
> Hi,
>
> We are interested on this too. So far we flag the records with timestamps in
> different points of the pipeline and use metrics gauges to measure latency
> between the different components, but would be good to know if there is
> something more specific to Kafka that we can do out of the box in Flink.
>
> Cheers,
>
> Bruno
>
> On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
> wrote:
>>
>> Hi,
>>
>> thank you Gyula for posting that question. I’d also be interested in how
>> this could be done.
>>
>> You mentioned the dependency on the commit frequency. I’m using
>> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer
>> a job's offsets as shown in the diagrams updated a lot more regularly than
>> the checkpointing interval. With the 10 consumer a commit is only made after
>> a successful checkpoint (or so it seems).
>>
>> Why is that so? The checkpoint contains the Kafka offset and would be able
>> to start reading wherever it left off, regardless of any offset stored in
>> Kafka or Zookeeper. Why is the offset not committed regularly, independently
>> from the checkpointing? Or did I misconfigure anything?
>>
>> Thanks
>> Florian
>>
>> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
>> >
>> > Hi All,
>> >
>> > I am wondering if anyone has some nice suggestions on what would be the
>> > simplest/best way of telling if a job is caught up with the Kafka input.
>> > An alternative question would be how to tell if a job is caught up to
>> > another job reading from the same topic.
>> >
>> > The first thing that comes to my mind is looking at the offsets Flink
>> > commits to Kafka. However this will only work if every job uses a different
>> > group id and even then it is not very reliable depending on the commit
>> > frequency.
>> >
>> > The use case I am trying to solve is fault tolerant update of a job, by
>> > taking a savepoint for job1 starting job2 from the savepoint, waiting until
>> > it catches up and then killing job1.
>> >
>> > Thanks for your input!
>> > Gyula
>>
>>
>

Re: Telling if a job has caught up with Kafka

Posted by Bruno Aranda <br...@gmail.com>.
Hi,

We are interested on this too. So far we flag the records with timestamps
in different points of the pipeline and use metrics gauges to measure
latency between the different components, but would be good to know if
there is something more specific to Kafka that we can do out of the box in
Flink.

Cheers,

Bruno

On Fri, 17 Mar 2017 at 10:07 Florian König <fl...@micardo.com>
wrote:

> Hi,
>
> thank you Gyula for posting that question. I’d also be interested in how
> this could be done.
>
> You mentioned the dependency on the commit frequency. I’m using
> https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka
> consumer a job's offsets as shown in the diagrams updated a lot more
> regularly than the checkpointing interval. With the 10 consumer a commit is
> only made after a successful checkpoint (or so it seems).
>
> Why is that so? The checkpoint contains the Kafka offset and would be able
> to start reading wherever it left off, regardless of any offset stored in
> Kafka or Zookeeper. Why is the offset not committed regularly,
> independently from the checkpointing? Or did I misconfigure anything?
>
> Thanks
> Florian
>
> > Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
> >
> > Hi All,
> >
> > I am wondering if anyone has some nice suggestions on what would be the
> simplest/best way of telling if a job is caught up with the Kafka input.
> > An alternative question would be how to tell if a job is caught up to
> another job reading from the same topic.
> >
> > The first thing that comes to my mind is looking at the offsets Flink
> commits to Kafka. However this will only work if every job uses a different
> group id and even then it is not very reliable depending on the commit
> frequency.
> >
> > The use case I am trying to solve is fault tolerant update of a job, by
> taking a savepoint for job1 starting job2 from the savepoint, waiting until
> it catches up and then killing job1.
> >
> > Thanks for your input!
> > Gyula
>
>
>

Re: Telling if a job has caught up with Kafka

Posted by Florian König <fl...@micardo.com>.
Hi,

thank you Gyula for posting that question. I’d also be interested in how this could be done.

You mentioned the dependency on the commit frequency. I’m using https://github.com/quantifind/KafkaOffsetMonitor. With the 08 Kafka consumer a job's offsets as shown in the diagrams updated a lot more regularly than the checkpointing interval. With the 10 consumer a commit is only made after a successful checkpoint (or so it seems).

Why is that so? The checkpoint contains the Kafka offset and would be able to start reading wherever it left off, regardless of any offset stored in Kafka or Zookeeper. Why is the offset not committed regularly, independently from the checkpointing? Or did I misconfigure anything?

Thanks
Florian

> Am 17.03.2017 um 10:26 schrieb Gyula Fóra <gy...@apache.org>:
> 
> Hi All,
> 
> I am wondering if anyone has some nice suggestions on what would be the simplest/best way of telling if a job is caught up with the Kafka input.
> An alternative question would be how to tell if a job is caught up to another job reading from the same topic.
> 
> The first thing that comes to my mind is looking at the offsets Flink commits to Kafka. However this will only work if every job uses a different group id and even then it is not very reliable depending on the commit frequency.
> 
> The use case I am trying to solve is fault tolerant update of a job, by taking a savepoint for job1 starting job2 from the savepoint, waiting until it catches up and then killing job1.
> 
> Thanks for your input!
> Gyula