You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Bogdan Dimitriu (bdimitri)" <bd...@cisco.com> on 2014/06/06 11:13:38 UTC

Getting the KafkaStream ID

Hello folks,

I’m using Kafka 0.8.0 with the high level consumer and I have a situation where I need to obtain the ID for each of the KafkaStreams that I create.
The KafkaStream class has a method called “clientId()” that I expected would give me just that, but unfortunately it returns the name of the consumer group.
So to make it clear, what I want to obtain is the string that looks like this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
Is there any way I could get that value for each of the streams? I’ve looked around the source code but I can’t see any way to do this.

Many thanks,
Bogdan


Re: Getting the KafkaStream ID

Posted by Guozhang Wang <wa...@gmail.com>.
It is not easy to get the fetcher thread id from kafka stream. We do have
the mapping as a topicThreadIdAndQueues, but it is private to the consumer
and not exposed to callers.

Guozhang


On Fri, Jun 6, 2014 at 9:00 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> I see. And can I somehow reliably get the ID of the fetcher thread that is
> providing data for a KafkaStream? Specifically I¹d like to know that ID
> from the thread where I¹m consuming (where I iterate through the stream).
>
> Thank you,
> Bogdan
>
> On 06/06/2014 16:16, "Guozhang Wang" <wa...@gmail.com> wrote:
>
> >Bogdan,
> >
> >The kafka stream does not have an ID itself, the one you mentioned is the
> >ID of the fetcher thread that put data into the stream. Although there is
> >a
> >one-to-one mapping between the fetcher thread and the stream, the Ids of
> >the fetcher cannot be accessed from the kafka stream itself.
> >
> >Guozhang
> >
> >
> >On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
> >bdimitri@cisco.com> wrote:
> >
> >> Hello folks,
> >>
> >> I¹m using Kafka 0.8.0 with the high level consumer and I have a
> >>situation
> >> where I need to obtain the ID for each of the KafkaStreams that I
> >>create.
> >> The KafkaStream class has a method called ³clientId()² that I expected
> >> would give me just that, but unfortunately it returns the name of the
> >> consumer group.
> >> So to make it clear, what I want to obtain is the string that looks like
> >> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> >> Is there any way I could get that value for each of the streams? I¹ve
> >> looked around the source code but I can¹t see any way to do this.
> >>
> >> Many thanks,
> >> Bogdan
> >>
> >>
> >
> >
> >--
> >-- Guozhang
>
>


-- 
-- Guozhang

Re: Getting the KafkaStream ID

Posted by "Bogdan Dimitriu (bdimitri)" <bd...@cisco.com>.
I see. And can I somehow reliably get the ID of the fetcher thread that is
providing data for a KafkaStream? Specifically I¹d like to know that ID
from the thread where I¹m consuming (where I iterate through the stream).

Thank you,
Bogdan

On 06/06/2014 16:16, "Guozhang Wang" <wa...@gmail.com> wrote:

>Bogdan,
>
>The kafka stream does not have an ID itself, the one you mentioned is the
>ID of the fetcher thread that put data into the stream. Although there is
>a
>one-to-one mapping between the fetcher thread and the stream, the Ids of
>the fetcher cannot be accessed from the kafka stream itself.
>
>Guozhang
>
>
>On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
>bdimitri@cisco.com> wrote:
>
>> Hello folks,
>>
>> I¹m using Kafka 0.8.0 with the high level consumer and I have a
>>situation
>> where I need to obtain the ID for each of the KafkaStreams that I
>>create.
>> The KafkaStream class has a method called ³clientId()² that I expected
>> would give me just that, but unfortunately it returns the name of the
>> consumer group.
>> So to make it clear, what I want to obtain is the string that looks like
>> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
>> Is there any way I could get that value for each of the streams? I¹ve
>> looked around the source code but I can¹t see any way to do this.
>>
>> Many thanks,
>> Bogdan
>>
>>
>
>
>-- 
>-- Guozhang


Re: Getting the KafkaStream ID

Posted by Guozhang Wang <wa...@gmail.com>.
Bogdan,

The kafka stream does not have an ID itself, the one you mentioned is the
ID of the fetcher thread that put data into the stream. Although there is a
one-to-one mapping between the fetcher thread and the stream, the Ids of
the fetcher cannot be accessed from the kafka stream itself.

Guozhang


On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> Hello folks,
>
> I’m using Kafka 0.8.0 with the high level consumer and I have a situation
> where I need to obtain the ID for each of the KafkaStreams that I create.
> The KafkaStream class has a method called “clientId()” that I expected
> would give me just that, but unfortunately it returns the name of the
> consumer group.
> So to make it clear, what I want to obtain is the string that looks like
> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> Is there any way I could get that value for each of the streams? I’ve
> looked around the source code but I can’t see any way to do this.
>
> Many thanks,
> Bogdan
>
>


-- 
-- Guozhang

Re: Getting the KafkaStream ID

Posted by Robert Hodges <be...@gmail.com>.
Hi Bogdan,

It sounds as if you could implement a form of signaling between the
consumers using a distributed barrier.  This can be implemented using Kafka
topics. For example you could create a control thread that posts the
current high-water mark for all consumers into a special topic, which gives
consumers "permission" to advance to that point. Let's say your high-water
mark is base on message timestamps. Here's how the consumers behave:

1.) Read the high-water mark from the signaling topic.
2.) Consume all messages up to the timestamp from the high-water mark.
3.) Go back to #1.

To make this work the consumers need to let the control thread know where
they are so it can post new high-water marks.  Consumers therefore need to
post their latest position periodically (perhaps into a second topic) so
that the control thread can compute the current hindmost position and
advance the barrier whenever the consumers are within some delta of each
other, say 60 seconds or whatever you decide is appropriate.

This algorithm has a couple of tricky elements.

1.) Stalls.  If either a consumer or the control thread stops everybody
does because the barrier will stop moving.  Correctness is therefore key.
 I would monitor for failure to advance as you will undoubtedly encounter
it.

2.) Problems computing the consumer position.  The nastiest one is if your
timestamps walk backwards as might be the case when changing from summer to
winter time, when there is an NTP failure on a producing host, or if
producing code has bugs.  Sadly this happens more often than you would
think.

Cheers, Robert


On Mon, Jun 9, 2014 at 4:07 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> Certainly.
> I know this may not sound like a great idea but I am running out of
> options here: I¹m basically trying to implement a consumer throttle. My
> application consumes from a fairly high number of partitions from a number
> of consumer servers. The data is put in the partitions by a producer in a
> round robin fashion so the number of messages each partition is given is
> even. The messages have a time component assigned to them.
> Now, for a good majority of time the consumers will be faster than the
> producers, so the lags I get with the ConsumerOffsetChecker are mostly 0
> (or 1) and this works well with the time component because once they are
> consumed there is a logical grouping of messages from all partitions based
> on the time component (coarsely).
> The point where I¹m starting to get into trouble is when the consumers are
> all stopped for a while and messages start to accumulate in the partitions
> without being consumed (hence the lag increases). Once I resume the
> consumers (with 1 thread per each partition) messages start to get
> consumed very fast, but because some messages take longer to process than
> others, over time the lags start to get very uneven between partitions and
> this starts to interfere with the grouping by the time component.
> So the only way I thought I could prevent this from happening was by
> throttling the ³fast² consumers, which have a smaller lag. This will only
> happen rarely, so I thought I could live with the approach. To do that, I
> obviously need to get the lag information periodically. I do that by a
> mechanism similar to what ConsumerOffsetChecker does. But now I need to
> know from which thread to call sleep() and there seems to be no decent way
> to find that out.
> Any better ideas would be highly appreciated.
>
> Thanks,
> Bogdan
>
> On 08/06/2014 06:25, "Jun Rao" <ju...@gmail.com> wrote:
>
> >Could you elaborate on the use case of the stream ID?
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
> >bdimitri@cisco.com> wrote:
> >
> >> Hello folks,
> >>
> >> I¹m using Kafka 0.8.0 with the high level consumer and I have a
> >>situation
> >> where I need to obtain the ID for each of the KafkaStreams that I
> >>create.
> >> The KafkaStream class has a method called ³clientId()² that I expected
> >> would give me just that, but unfortunately it returns the name of the
> >> consumer group.
> >> So to make it clear, what I want to obtain is the string that looks like
> >> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> >> Is there any way I could get that value for each of the streams? I¹ve
> >> looked around the source code but I can¹t see any way to do this.
> >>
> >> Many thanks,
> >> Bogdan
> >>
> >>
>
>

Re: Getting the KafkaStream ID

Posted by Jun Rao <ju...@gmail.com>.
The jmx should be of the form of clientId*-ConsumerLag under kafka.server.
Pausing the iteration will indirectly pause the underlying fetcher.

Thanks,

Jun


On Wed, Jun 11, 2014 at 3:09 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> Which JMX MBeans are you referring to, Jun? I couldn’t find anything that
> gives me the same information as the ConsumerOffsetChecker tool.
> In any case, my main problem is that I don’t know when I should slow down
> the iteration because I don’t know which stream the iteration is
> consuming. I have the global partition lag situation (from the
> ConsumerOffsetChecker), but I don’t know how to apply it to slow down the
> iterators because I can’t identify them.
>
> Thanks,
> Bogdan
>
> On 09/06/2014 15:28, "Jun Rao" <ju...@gmail.com> wrote:
>
> >We do have a jmx that reports the lag per partition. You could probably
> >get
> >the lag that way. Then, you just need to slow down the iteration on the
> >fast partition.
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Mon, Jun 9, 2014 at 4:07 AM, Bogdan Dimitriu (bdimitri) <
> >bdimitri@cisco.com> wrote:
> >
> >> Certainly.
> >> I know this may not sound like a great idea but I am running out of
> >> options here: I¹m basically trying to implement a consumer throttle. My
> >> application consumes from a fairly high number of partitions from a
> >>number
> >> of consumer servers. The data is put in the partitions by a producer in
> >>a
> >> round robin fashion so the number of messages each partition is given is
> >> even. The messages have a time component assigned to them.
> >> Now, for a good majority of time the consumers will be faster than the
> >> producers, so the lags I get with the ConsumerOffsetChecker are mostly 0
> >> (or 1) and this works well with the time component because once they are
> >> consumed there is a logical grouping of messages from all partitions
> >>based
> >> on the time component (coarsely).
> >> The point where I¹m starting to get into trouble is when the consumers
> >>are
> >> all stopped for a while and messages start to accumulate in the
> >>partitions
> >> without being consumed (hence the lag increases). Once I resume the
> >> consumers (with 1 thread per each partition) messages start to get
> >> consumed very fast, but because some messages take longer to process
> >>than
> >> others, over time the lags start to get very uneven between partitions
> >>and
> >> this starts to interfere with the grouping by the time component.
> >> So the only way I thought I could prevent this from happening was by
> >> throttling the ³fast² consumers, which have a smaller lag. This will
> >>only
> >> happen rarely, so I thought I could live with the approach. To do that,
> >>I
> >> obviously need to get the lag information periodically. I do that by a
> >> mechanism similar to what ConsumerOffsetChecker does. But now I need to
> >> know from which thread to call sleep() and there seems to be no decent
> >>way
> >> to find that out.
> >> Any better ideas would be highly appreciated.
> >>
> >> Thanks,
> >> Bogdan
> >>
> >> On 08/06/2014 06:25, "Jun Rao" <ju...@gmail.com> wrote:
> >>
> >> >Could you elaborate on the use case of the stream ID?
> >> >
> >> >Thanks,
> >> >
> >> >Jun
> >> >
> >> >
> >> >On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
> >> >bdimitri@cisco.com> wrote:
> >> >
> >> >> Hello folks,
> >> >>
> >> >> I¹m using Kafka 0.8.0 with the high level consumer and I have a
> >> >>situation
> >> >> where I need to obtain the ID for each of the KafkaStreams that I
> >> >>create.
> >> >> The KafkaStream class has a method called ³clientId()² that I
> >>expected
> >> >> would give me just that, but unfortunately it returns the name of the
> >> >> consumer group.
> >> >> So to make it clear, what I want to obtain is the string that looks
> >>like
> >> >> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> >> >> Is there any way I could get that value for each of the streams? I¹ve
> >> >> looked around the source code but I can¹t see any way to do this.
> >> >>
> >> >> Many thanks,
> >> >> Bogdan
> >> >>
> >> >>
> >>
> >>
>
>

Re: Getting the KafkaStream ID

Posted by "Bogdan Dimitriu (bdimitri)" <bd...@cisco.com>.
Which JMX MBeans are you referring to, Jun? I couldn’t find anything that
gives me the same information as the ConsumerOffsetChecker tool.
In any case, my main problem is that I don’t know when I should slow down
the iteration because I don’t know which stream the iteration is
consuming. I have the global partition lag situation (from the
ConsumerOffsetChecker), but I don’t know how to apply it to slow down the
iterators because I can’t identify them.

Thanks,
Bogdan

On 09/06/2014 15:28, "Jun Rao" <ju...@gmail.com> wrote:

>We do have a jmx that reports the lag per partition. You could probably
>get
>the lag that way. Then, you just need to slow down the iteration on the
>fast partition.
>
>Thanks,
>
>Jun
>
>
>On Mon, Jun 9, 2014 at 4:07 AM, Bogdan Dimitriu (bdimitri) <
>bdimitri@cisco.com> wrote:
>
>> Certainly.
>> I know this may not sound like a great idea but I am running out of
>> options here: I¹m basically trying to implement a consumer throttle. My
>> application consumes from a fairly high number of partitions from a
>>number
>> of consumer servers. The data is put in the partitions by a producer in
>>a
>> round robin fashion so the number of messages each partition is given is
>> even. The messages have a time component assigned to them.
>> Now, for a good majority of time the consumers will be faster than the
>> producers, so the lags I get with the ConsumerOffsetChecker are mostly 0
>> (or 1) and this works well with the time component because once they are
>> consumed there is a logical grouping of messages from all partitions
>>based
>> on the time component (coarsely).
>> The point where I¹m starting to get into trouble is when the consumers
>>are
>> all stopped for a while and messages start to accumulate in the
>>partitions
>> without being consumed (hence the lag increases). Once I resume the
>> consumers (with 1 thread per each partition) messages start to get
>> consumed very fast, but because some messages take longer to process
>>than
>> others, over time the lags start to get very uneven between partitions
>>and
>> this starts to interfere with the grouping by the time component.
>> So the only way I thought I could prevent this from happening was by
>> throttling the ³fast² consumers, which have a smaller lag. This will
>>only
>> happen rarely, so I thought I could live with the approach. To do that,
>>I
>> obviously need to get the lag information periodically. I do that by a
>> mechanism similar to what ConsumerOffsetChecker does. But now I need to
>> know from which thread to call sleep() and there seems to be no decent
>>way
>> to find that out.
>> Any better ideas would be highly appreciated.
>>
>> Thanks,
>> Bogdan
>>
>> On 08/06/2014 06:25, "Jun Rao" <ju...@gmail.com> wrote:
>>
>> >Could you elaborate on the use case of the stream ID?
>> >
>> >Thanks,
>> >
>> >Jun
>> >
>> >
>> >On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
>> >bdimitri@cisco.com> wrote:
>> >
>> >> Hello folks,
>> >>
>> >> I¹m using Kafka 0.8.0 with the high level consumer and I have a
>> >>situation
>> >> where I need to obtain the ID for each of the KafkaStreams that I
>> >>create.
>> >> The KafkaStream class has a method called ³clientId()² that I
>>expected
>> >> would give me just that, but unfortunately it returns the name of the
>> >> consumer group.
>> >> So to make it clear, what I want to obtain is the string that looks
>>like
>> >> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
>> >> Is there any way I could get that value for each of the streams? I¹ve
>> >> looked around the source code but I can¹t see any way to do this.
>> >>
>> >> Many thanks,
>> >> Bogdan
>> >>
>> >>
>>
>>


Re: Getting the KafkaStream ID

Posted by Jun Rao <ju...@gmail.com>.
We do have a jmx that reports the lag per partition. You could probably get
the lag that way. Then, you just need to slow down the iteration on the
fast partition.

Thanks,

Jun


On Mon, Jun 9, 2014 at 4:07 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> Certainly.
> I know this may not sound like a great idea but I am running out of
> options here: I¹m basically trying to implement a consumer throttle. My
> application consumes from a fairly high number of partitions from a number
> of consumer servers. The data is put in the partitions by a producer in a
> round robin fashion so the number of messages each partition is given is
> even. The messages have a time component assigned to them.
> Now, for a good majority of time the consumers will be faster than the
> producers, so the lags I get with the ConsumerOffsetChecker are mostly 0
> (or 1) and this works well with the time component because once they are
> consumed there is a logical grouping of messages from all partitions based
> on the time component (coarsely).
> The point where I¹m starting to get into trouble is when the consumers are
> all stopped for a while and messages start to accumulate in the partitions
> without being consumed (hence the lag increases). Once I resume the
> consumers (with 1 thread per each partition) messages start to get
> consumed very fast, but because some messages take longer to process than
> others, over time the lags start to get very uneven between partitions and
> this starts to interfere with the grouping by the time component.
> So the only way I thought I could prevent this from happening was by
> throttling the ³fast² consumers, which have a smaller lag. This will only
> happen rarely, so I thought I could live with the approach. To do that, I
> obviously need to get the lag information periodically. I do that by a
> mechanism similar to what ConsumerOffsetChecker does. But now I need to
> know from which thread to call sleep() and there seems to be no decent way
> to find that out.
> Any better ideas would be highly appreciated.
>
> Thanks,
> Bogdan
>
> On 08/06/2014 06:25, "Jun Rao" <ju...@gmail.com> wrote:
>
> >Could you elaborate on the use case of the stream ID?
> >
> >Thanks,
> >
> >Jun
> >
> >
> >On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
> >bdimitri@cisco.com> wrote:
> >
> >> Hello folks,
> >>
> >> I¹m using Kafka 0.8.0 with the high level consumer and I have a
> >>situation
> >> where I need to obtain the ID for each of the KafkaStreams that I
> >>create.
> >> The KafkaStream class has a method called ³clientId()² that I expected
> >> would give me just that, but unfortunately it returns the name of the
> >> consumer group.
> >> So to make it clear, what I want to obtain is the string that looks like
> >> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> >> Is there any way I could get that value for each of the streams? I¹ve
> >> looked around the source code but I can¹t see any way to do this.
> >>
> >> Many thanks,
> >> Bogdan
> >>
> >>
>
>

Re: Getting the KafkaStream ID

Posted by "Bogdan Dimitriu (bdimitri)" <bd...@cisco.com>.
Certainly.
I know this may not sound like a great idea but I am running out of
options here: I¹m basically trying to implement a consumer throttle. My
application consumes from a fairly high number of partitions from a number
of consumer servers. The data is put in the partitions by a producer in a
round robin fashion so the number of messages each partition is given is
even. The messages have a time component assigned to them.
Now, for a good majority of time the consumers will be faster than the
producers, so the lags I get with the ConsumerOffsetChecker are mostly 0
(or 1) and this works well with the time component because once they are
consumed there is a logical grouping of messages from all partitions based
on the time component (coarsely).
The point where I¹m starting to get into trouble is when the consumers are
all stopped for a while and messages start to accumulate in the partitions
without being consumed (hence the lag increases). Once I resume the
consumers (with 1 thread per each partition) messages start to get
consumed very fast, but because some messages take longer to process than
others, over time the lags start to get very uneven between partitions and
this starts to interfere with the grouping by the time component.
So the only way I thought I could prevent this from happening was by
throttling the ³fast² consumers, which have a smaller lag. This will only
happen rarely, so I thought I could live with the approach. To do that, I
obviously need to get the lag information periodically. I do that by a
mechanism similar to what ConsumerOffsetChecker does. But now I need to
know from which thread to call sleep() and there seems to be no decent way
to find that out.
Any better ideas would be highly appreciated.

Thanks,
Bogdan

On 08/06/2014 06:25, "Jun Rao" <ju...@gmail.com> wrote:

>Could you elaborate on the use case of the stream ID?
>
>Thanks,
>
>Jun
>
>
>On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
>bdimitri@cisco.com> wrote:
>
>> Hello folks,
>>
>> I¹m using Kafka 0.8.0 with the high level consumer and I have a
>>situation
>> where I need to obtain the ID for each of the KafkaStreams that I
>>create.
>> The KafkaStream class has a method called ³clientId()² that I expected
>> would give me just that, but unfortunately it returns the name of the
>> consumer group.
>> So to make it clear, what I want to obtain is the string that looks like
>> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
>> Is there any way I could get that value for each of the streams? I¹ve
>> looked around the source code but I can¹t see any way to do this.
>>
>> Many thanks,
>> Bogdan
>>
>>


Re: Getting the KafkaStream ID

Posted by Jun Rao <ju...@gmail.com>.
Could you elaborate on the use case of the stream ID?

Thanks,

Jun


On Fri, Jun 6, 2014 at 2:13 AM, Bogdan Dimitriu (bdimitri) <
bdimitri@cisco.com> wrote:

> Hello folks,
>
> I’m using Kafka 0.8.0 with the high level consumer and I have a situation
> where I need to obtain the ID for each of the KafkaStreams that I create.
> The KafkaStream class has a method called “clientId()” that I expected
> would give me just that, but unfortunately it returns the name of the
> consumer group.
> So to make it clear, what I want to obtain is the string that looks like
> this: myconsumergroup_myhost-1402045464004-2dc0cbf2-0.
> Is there any way I could get that value for each of the streams? I’ve
> looked around the source code but I can’t see any way to do this.
>
> Many thanks,
> Bogdan
>
>