You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Sloot, Hans-Peter" <ha...@atos.net> on 2015/03/17 11:54:11 UTC

consumer groups in python

Hi,

I wrote a small python script to consume messages from kafka.

The consumer is defined as follows:
kafka = KafkaConsumer('my-replicated-topic',
                       metadata_broker_list=['localhost:9092'],
                       group_id='my_consumer_group',
                       auto_commit_enable=True,
                       auto_commit_interval_ms=30 * 1000,
                       auto_offset_reset='smallest')

But when I start 2 consumers simultaneously both receive all messages from the topic.
I would expect to have 1 consumer about half the number of messages and the other the rest.

How can I arrange this?

Regards Hans-Peter

This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, Atos’ liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted. On all offers and agreements under which Atos Nederland B.V. supplies goods and/or services of whatever nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be promptly submitted to you on your request.

Re: consumer groups in python

Posted by Kasper Mackenhauer Jacobsen <ka...@falconsocial.com>.
We set the partitions the python consumers needs manually for now, I'm
looking into a solution using zookeeper (possibly) to balance them out
automatically though.

On Tue, Mar 17, 2015 at 2:51 PM, Todd Palino <tp...@gmail.com> wrote:

> Yeah, this is exactly correct. The python client does not implement the
> Zookeeper logic that would be needed to do a balanced consumer. While it's
> certainly possible to do it (for example, Joe implemented it in Go), the
> logic is non-trivial and nobody has bothered to this point. I don't think
> anyone will, as the new consumer will make it much easier to implement
> clients without needing to do it.
>
> In the past, we've used an internal python module that calls a C library
> underneath that does the balancing. Now we're moving to one that calls our
> REST interface to Kafka, which is easier to work with. Another option that
> some consumers use is to pipe messages in from the kafka-console-consumer.
> This works well, but if you're not careful with stopping it you can easily
> lose messages.
>
> -Todd
>
>
> On Tue, Mar 17, 2015 at 6:47 AM, Sloot, Hans-Peter <
> hans-peter.sloot@atos.net> wrote:
>
> > Thanks
> >
> > I just came across this
> https://github.com/mumrah/kafka-python/issues/112
> > It says:
> >         That contract of one message per consumer group only works for
> the
> > coordinated consumers which are implemented for the JVM only (i.e., Scala
> > and Java clients).
> >
> >
> > -----Original Message-----
> > From: Steve Miller [mailto:steve@idrathernotsay.com]
> > Sent: Tuesday, March 17, 2015 2:18 PM
> > To: users@kafka.apache.org
> > Subject: Re: consumer groups in python
> >
> > It's possible that I just haven't used it but I am reasonably sure that
> > the python API doesn't have a way to store offsets in ZK.  You would need
> > to implement something more or less compatible with what the Scala/Java
> API
> > does, presumably.
> >
> > On the plus side the python API -- possibly just because in python,
> > nothing is truly private (: -- exposes offsets and offset management in
> > ways that those other APIs seem not to.   Seeking, say, to approximately
> > 1000 messages before the current offset is no big deal in python, nor is
> > fetching oldest and newest offsets for topics (e.g., if you want to alert
> > if nothing is being produced, without having to fire up a consumer).  I
> > have close to zero experience with anything other than the python API and
> > librdkafka but judging from questions I see here those seem to be
> difficult
> > to do in Scala or Java.  I hope to do more with those APIs soon (and in
> > fact am at ScalaDays right now in part so I can attend some intro Scala
> > training (-: ).
> >
> >     -Steve
> >
> >
> >
> > > On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <
> > hans-peter.sloot@atos.net> wrote:
> > >
> > > Hi,
> > >
> > > I wrote a small python script to consume messages from kafka.
> > >
> > > The consumer is defined as follows:
> > > kafka = KafkaConsumer('my-replicated-topic',
> > >                       metadata_broker_list=['localhost:9092'],
> > >                       group_id='my_consumer_group',
> > >                       auto_commit_enable=True,
> > >                       auto_commit_interval_ms=30 * 1000,
> > >                       auto_offset_reset='smallest')
> > >
> > > But when I start 2 consumers simultaneously both receive all messages
> > from the topic.
> > > I would expect to have 1 consumer about half the number of messages and
> > the other the rest.
> > >
> > > How can I arrange this?
> > >
> > > Regards Hans-Peter
> > >
> > > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive this
> > e-mail in error, please notify the sender immediately and destroy it. As
> > its integrity cannot be secured on the Internet, Atos’ liability cannot
> be
> > triggered for the message content. Although the sender endeavours to
> > maintain a computer virus-free network, the sender does not warrant that
> > this transmission is virus-free and will not be liable for any damages
> > resulting from any virus transmitted. On all offers and agreements under
> > which Atos Nederland B.V. supplies goods and/or services of whatever
> > nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> > The Terms of Delivery shall be promptly submitted to you on your request.
> > This e-mail and the documents attached are confidential and intended
> > solely for the addressee; it may also be privileged. If you receive this
> > e-mail in error, please notify the sender immediately and destroy it. As
> > its integrity cannot be secured on the Internet, Atos’ liability cannot
> be
> > triggered for the message content. Although the sender endeavours to
> > maintain a computer virus-free network, the sender does not warrant that
> > this transmission is virus-free and will not be liable for any damages
> > resulting from any virus transmitted. On all offers and agreements under
> > which Atos Nederland B.V. supplies goods and/or services of whatever
> > nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> > The Terms of Delivery shall be promptly submitted to you on your request.
> >
>



-- 
*Kasper Mackenhauer Jacobsen*

Re: consumer groups in python

Posted by Todd Palino <tp...@gmail.com>.
Yeah, this is exactly correct. The python client does not implement the
Zookeeper logic that would be needed to do a balanced consumer. While it's
certainly possible to do it (for example, Joe implemented it in Go), the
logic is non-trivial and nobody has bothered to this point. I don't think
anyone will, as the new consumer will make it much easier to implement
clients without needing to do it.

In the past, we've used an internal python module that calls a C library
underneath that does the balancing. Now we're moving to one that calls our
REST interface to Kafka, which is easier to work with. Another option that
some consumers use is to pipe messages in from the kafka-console-consumer.
This works well, but if you're not careful with stopping it you can easily
lose messages.

-Todd


On Tue, Mar 17, 2015 at 6:47 AM, Sloot, Hans-Peter <
hans-peter.sloot@atos.net> wrote:

> Thanks
>
> I just came across this https://github.com/mumrah/kafka-python/issues/112
> It says:
>         That contract of one message per consumer group only works for the
> coordinated consumers which are implemented for the JVM only (i.e., Scala
> and Java clients).
>
>
> -----Original Message-----
> From: Steve Miller [mailto:steve@idrathernotsay.com]
> Sent: Tuesday, March 17, 2015 2:18 PM
> To: users@kafka.apache.org
> Subject: Re: consumer groups in python
>
> It's possible that I just haven't used it but I am reasonably sure that
> the python API doesn't have a way to store offsets in ZK.  You would need
> to implement something more or less compatible with what the Scala/Java API
> does, presumably.
>
> On the plus side the python API -- possibly just because in python,
> nothing is truly private (: -- exposes offsets and offset management in
> ways that those other APIs seem not to.   Seeking, say, to approximately
> 1000 messages before the current offset is no big deal in python, nor is
> fetching oldest and newest offsets for topics (e.g., if you want to alert
> if nothing is being produced, without having to fire up a consumer).  I
> have close to zero experience with anything other than the python API and
> librdkafka but judging from questions I see here those seem to be difficult
> to do in Scala or Java.  I hope to do more with those APIs soon (and in
> fact am at ScalaDays right now in part so I can attend some intro Scala
> training (-: ).
>
>     -Steve
>
>
>
> > On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <
> hans-peter.sloot@atos.net> wrote:
> >
> > Hi,
> >
> > I wrote a small python script to consume messages from kafka.
> >
> > The consumer is defined as follows:
> > kafka = KafkaConsumer('my-replicated-topic',
> >                       metadata_broker_list=['localhost:9092'],
> >                       group_id='my_consumer_group',
> >                       auto_commit_enable=True,
> >                       auto_commit_interval_ms=30 * 1000,
> >                       auto_offset_reset='smallest')
> >
> > But when I start 2 consumers simultaneously both receive all messages
> from the topic.
> > I would expect to have 1 consumer about half the number of messages and
> the other the rest.
> >
> > How can I arrange this?
> >
> > Regards Hans-Peter
> >
> > This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, Atos’ liability cannot be
> triggered for the message content. Although the sender endeavours to
> maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted. On all offers and agreements under
> which Atos Nederland B.V. supplies goods and/or services of whatever
> nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply.
> The Terms of Delivery shall be promptly submitted to you on your request.
>

RE: consumer groups in python

Posted by "Sloot, Hans-Peter" <ha...@atos.net>.
Thanks

I just came across this https://github.com/mumrah/kafka-python/issues/112
It says:
        That contract of one message per consumer group only works for the coordinated consumers which are implemented for the JVM only (i.e., Scala and Java clients).


-----Original Message-----
From: Steve Miller [mailto:steve@idrathernotsay.com]
Sent: Tuesday, March 17, 2015 2:18 PM
To: users@kafka.apache.org
Subject: Re: consumer groups in python

It's possible that I just haven't used it but I am reasonably sure that the python API doesn't have a way to store offsets in ZK.  You would need to implement something more or less compatible with what the Scala/Java API does, presumably.

On the plus side the python API -- possibly just because in python, nothing is truly private (: -- exposes offsets and offset management in ways that those other APIs seem not to.   Seeking, say, to approximately 1000 messages before the current offset is no big deal in python, nor is fetching oldest and newest offsets for topics (e.g., if you want to alert if nothing is being produced, without having to fire up a consumer).  I have close to zero experience with anything other than the python API and librdkafka but judging from questions I see here those seem to be difficult to do in Scala or Java.  I hope to do more with those APIs soon (and in fact am at ScalaDays right now in part so I can attend some intro Scala training (-: ).

    -Steve



> On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <ha...@atos.net> wrote:
>
> Hi,
>
> I wrote a small python script to consume messages from kafka.
>
> The consumer is defined as follows:
> kafka = KafkaConsumer('my-replicated-topic',
>                       metadata_broker_list=['localhost:9092'],
>                       group_id='my_consumer_group',
>                       auto_commit_enable=True,
>                       auto_commit_interval_ms=30 * 1000,
>                       auto_offset_reset='smallest')
>
> But when I start 2 consumers simultaneously both receive all messages from the topic.
> I would expect to have 1 consumer about half the number of messages and the other the rest.
>
> How can I arrange this?
>
> Regards Hans-Peter
>
> This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, Atos’ liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted. On all offers and agreements under which Atos Nederland B.V. supplies goods and/or services of whatever nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be promptly submitted to you on your request.
This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, Atos’ liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted. On all offers and agreements under which Atos Nederland B.V. supplies goods and/or services of whatever nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be promptly submitted to you on your request.

Re: consumer groups in python

Posted by Steve Miller <st...@idrathernotsay.com>.
It's possible that I just haven't used it but I am reasonably sure that the python API doesn't have a way to store offsets in ZK.  You would need to implement something more or less compatible with what the Scala/Java API does, presumably.

On the plus side the python API -- possibly just because in python, nothing is truly private (: -- exposes offsets and offset management in ways that those other APIs seem not to.   Seeking, say, to approximately 1000 messages before the current offset is no big deal in python, nor is fetching oldest and newest offsets for topics (e.g., if you want to alert if nothing is being produced, without having to fire up a consumer).  I have close to zero experience with anything other than the python API and librdkafka but judging from questions I see here those seem to be difficult to do in Scala or Java.  I hope to do more with those APIs soon (and in fact am at ScalaDays right now in part so I can attend some intro Scala training (-: ).

    -Steve



> On Mar 17, 2015, at 3:54 AM, Sloot, Hans-Peter <ha...@atos.net> wrote:
> 
> Hi,
> 
> I wrote a small python script to consume messages from kafka.
> 
> The consumer is defined as follows:
> kafka = KafkaConsumer('my-replicated-topic',
>                       metadata_broker_list=['localhost:9092'],
>                       group_id='my_consumer_group',
>                       auto_commit_enable=True,
>                       auto_commit_interval_ms=30 * 1000,
>                       auto_offset_reset='smallest')
> 
> But when I start 2 consumers simultaneously both receive all messages from the topic.
> I would expect to have 1 consumer about half the number of messages and the other the rest.
> 
> How can I arrange this?
> 
> Regards Hans-Peter
> 
> This e-mail and the documents attached are confidential and intended solely for the addressee; it may also be privileged. If you receive this e-mail in error, please notify the sender immediately and destroy it. As its integrity cannot be secured on the Internet, Atos’ liability cannot be triggered for the message content. Although the sender endeavours to maintain a computer virus-free network, the sender does not warrant that this transmission is virus-free and will not be liable for any damages resulting from any virus transmitted. On all offers and agreements under which Atos Nederland B.V. supplies goods and/or services of whatever nature, the Terms of Delivery from Atos Nederland B.V. exclusively apply. The Terms of Delivery shall be promptly submitted to you on your request.