You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Manan G <ma...@gmail.com> on 2017/10/12 22:31:48 UTC

Figuring out lag within Java consumer application

For my use case, I need to figure out the lag within the Java consumer
itself that is consuming some topic. Ideally, the consumer application
would monitor the lag every minute or so and take some action on its own if
consumer falls behind (i.e. spin up more threads to process records - my
use case does not care about record order). For our purpose, it is OK if
lag information is bit stale.

* AFAIK, Java KafkaConsumer APIs do not seem to expose any information
directly based on which I can figure out the lag within my Java consumer
application.

* It seems that, alternatively, I can create separate new KafkaConsumer (or
possibly use existing KafkaConsumer my consumer application is using), seek
to end, and call "position()" API to figure out the end offsets for all
partitions I am interested in. Since within my consumer application, I
already know which offset consumer is at, I can figure out the lag.
However, for this one basic information, this solution is bit more involved
for our framework for various reasons. Also, it requires one to either use
separate KafkaConsumer just to figure out lag or possibly re-use same
KafkaConsumer our consumer application is using, but somehow implement the
logic of forwarding to end to find out end offset and resetting back (not
sure if it's feasible yet without issues).

* If I am not mistaken, looking at KafkaConsumer code itself, "high
watermark" information seems to be already there in FetchResponse
(FetchResponse.PartitionData). It's just that it is not exposed. Is there
any way to retrieve this information somehow in my consumer application?

In general, wouldn't it be useful for consumer application to have lag
information available with simple API call?

Thanks,
M

Re: Figuring out lag within Java consumer application

Posted by Stas Chizhov <sc...@gmail.com>.
Hi,

You can get lag as a metric here:
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#metrics()
.

BR,
Stas.

2017-10-13 2:08 GMT+02:00 Stephen Powis <sp...@salesforce.com>:

> So I have the same use case as the original poster and had the same issue
> with the older 0.10.x clients and not being able to determine the tail
> offsets even tho the fetch response contains the HW mark.
>
> From what I could understand by tracing through the 0.11.0 consumer code,
> it makes additional API/network calls to the kafka cluster to retrieve the
> tail/end offsets information.  Assuming I haven't mis-read/mis-understood
> the code, for most use cases this probably makes sense.  But in time
> sensitive code, it bummed me out to have to make additional calls to get
> that information when technically it's already available via the HW
> property in the fetches, just the consumer has no access to it.
>
> Is there any talk about exposing this property somewhere in the consumers
> in the future?
>
> On Fri, Oct 13, 2017 at 8:35 AM, Manan G <ma...@gmail.com> wrote:
>
> > NM. 0.11 KafkaConsumer seems to have added "endOffsets" API!
> >
> > On Thu, Oct 12, 2017 at 3:31 PM, Manan G <ma...@gmail.com> wrote:
> >
> > > For my use case, I need to figure out the lag within the Java consumer
> > > itself that is consuming some topic. Ideally, the consumer application
> > > would monitor the lag every minute or so and take some action on its
> own
> > if
> > > consumer falls behind (i.e. spin up more threads to process records -
> my
> > > use case does not care about record order). For our purpose, it is OK
> if
> > > lag information is bit stale.
> > >
> > > * AFAIK, Java KafkaConsumer APIs do not seem to expose any information
> > > directly based on which I can figure out the lag within my Java
> consumer
> > > application.
> > >
> > > * It seems that, alternatively, I can create separate new KafkaConsumer
> > > (or possibly use existing KafkaConsumer my consumer application is
> > using),
> > > seek to end, and call "position()" API to figure out the end offsets
> for
> > > all partitions I am interested in. Since within my consumer
> application,
> > I
> > > already know which offset consumer is at, I can figure out the lag.
> > > However, for this one basic information, this solution is bit more
> > involved
> > > for our framework for various reasons. Also, it requires one to either
> > use
> > > separate KafkaConsumer just to figure out lag or possibly re-use same
> > > KafkaConsumer our consumer application is using, but somehow implement
> > the
> > > logic of forwarding to end to find out end offset and resetting back
> (not
> > > sure if it's feasible yet without issues).
> > >
> > > * If I am not mistaken, looking at KafkaConsumer code itself, "high
> > > watermark" information seems to be already there in FetchResponse
> > > (FetchResponse.PartitionData). It's just that it is not exposed. Is
> there
> > > any way to retrieve this information somehow in my consumer
> application?
> > >
> > > In general, wouldn't it be useful for consumer application to have lag
> > > information available with simple API call?
> > >
> > > Thanks,
> > > M
> > >
> > >
> > >
> >
>

Re: Figuring out lag within Java consumer application

Posted by Stephen Powis <sp...@salesforce.com>.
So I have the same use case as the original poster and had the same issue
with the older 0.10.x clients and not being able to determine the tail
offsets even tho the fetch response contains the HW mark.

From what I could understand by tracing through the 0.11.0 consumer code,
it makes additional API/network calls to the kafka cluster to retrieve the
tail/end offsets information.  Assuming I haven't mis-read/mis-understood
the code, for most use cases this probably makes sense.  But in time
sensitive code, it bummed me out to have to make additional calls to get
that information when technically it's already available via the HW
property in the fetches, just the consumer has no access to it.

Is there any talk about exposing this property somewhere in the consumers
in the future?

On Fri, Oct 13, 2017 at 8:35 AM, Manan G <ma...@gmail.com> wrote:

> NM. 0.11 KafkaConsumer seems to have added "endOffsets" API!
>
> On Thu, Oct 12, 2017 at 3:31 PM, Manan G <ma...@gmail.com> wrote:
>
> > For my use case, I need to figure out the lag within the Java consumer
> > itself that is consuming some topic. Ideally, the consumer application
> > would monitor the lag every minute or so and take some action on its own
> if
> > consumer falls behind (i.e. spin up more threads to process records - my
> > use case does not care about record order). For our purpose, it is OK if
> > lag information is bit stale.
> >
> > * AFAIK, Java KafkaConsumer APIs do not seem to expose any information
> > directly based on which I can figure out the lag within my Java consumer
> > application.
> >
> > * It seems that, alternatively, I can create separate new KafkaConsumer
> > (or possibly use existing KafkaConsumer my consumer application is
> using),
> > seek to end, and call "position()" API to figure out the end offsets for
> > all partitions I am interested in. Since within my consumer application,
> I
> > already know which offset consumer is at, I can figure out the lag.
> > However, for this one basic information, this solution is bit more
> involved
> > for our framework for various reasons. Also, it requires one to either
> use
> > separate KafkaConsumer just to figure out lag or possibly re-use same
> > KafkaConsumer our consumer application is using, but somehow implement
> the
> > logic of forwarding to end to find out end offset and resetting back (not
> > sure if it's feasible yet without issues).
> >
> > * If I am not mistaken, looking at KafkaConsumer code itself, "high
> > watermark" information seems to be already there in FetchResponse
> > (FetchResponse.PartitionData). It's just that it is not exposed. Is there
> > any way to retrieve this information somehow in my consumer application?
> >
> > In general, wouldn't it be useful for consumer application to have lag
> > information available with simple API call?
> >
> > Thanks,
> > M
> >
> >
> >
>

Re: Figuring out lag within Java consumer application

Posted by Manan G <ma...@gmail.com>.
NM. 0.11 KafkaConsumer seems to have added "endOffsets" API!

On Thu, Oct 12, 2017 at 3:31 PM, Manan G <ma...@gmail.com> wrote:

> For my use case, I need to figure out the lag within the Java consumer
> itself that is consuming some topic. Ideally, the consumer application
> would monitor the lag every minute or so and take some action on its own if
> consumer falls behind (i.e. spin up more threads to process records - my
> use case does not care about record order). For our purpose, it is OK if
> lag information is bit stale.
>
> * AFAIK, Java KafkaConsumer APIs do not seem to expose any information
> directly based on which I can figure out the lag within my Java consumer
> application.
>
> * It seems that, alternatively, I can create separate new KafkaConsumer
> (or possibly use existing KafkaConsumer my consumer application is using),
> seek to end, and call "position()" API to figure out the end offsets for
> all partitions I am interested in. Since within my consumer application, I
> already know which offset consumer is at, I can figure out the lag.
> However, for this one basic information, this solution is bit more involved
> for our framework for various reasons. Also, it requires one to either use
> separate KafkaConsumer just to figure out lag or possibly re-use same
> KafkaConsumer our consumer application is using, but somehow implement the
> logic of forwarding to end to find out end offset and resetting back (not
> sure if it's feasible yet without issues).
>
> * If I am not mistaken, looking at KafkaConsumer code itself, "high
> watermark" information seems to be already there in FetchResponse
> (FetchResponse.PartitionData). It's just that it is not exposed. Is there
> any way to retrieve this information somehow in my consumer application?
>
> In general, wouldn't it be useful for consumer application to have lag
> information available with simple API call?
>
> Thanks,
> M
>
>
>