You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by David Montgomery <da...@gmail.com> on 2013/02/16 13:48:33 UTC

python and kafka - how to use as a queue

Hi,

I have a zookeer and kafka set up.

I am using this python client:  https://github.com/mumrah/kafka-python

I can send and receive messages but they are not deleted.

How can I send a message to kafka and no other consumer can use it?


I feel I am missing something on how kafka works

def produce():
    kafka = KafkaClient("xxx.xxx", 9092)
    kafka.send_messages_simple("my-topic", "some message")
    kafka.close()
    print 'done'

def consume():
    kafka = KafkaClient("xxx.xxx", 9092)
    for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024,False):
        print(msg.payload)
    kafka.close()
    print 'done'

Every time I ran the above...everytime I ran consume the messages just grew
from previous messages.

Am I missing something on the server.properties file?

Thanks

Re: python and kafka - how to use as a queue

Posted by David Montgomery <da...@gmail.com>.
Great.  I will follow your progress.  I will use kafka 7.x in the meantime
for development.  I am sure you will announce in then group with the 8.x
has a RC for your client?


On Sun, Feb 17, 2013 at 1:23 PM, David Arthur <mu...@gmail.com> wrote:

> It is indeed pure python
>
>
> On 2/17/13 12:20 AM, David Montgomery wrote:
>
>> Key issue with gevent is there can be no C bindings.  If pure python then
>> the sockets can be monkey patched as long as pure python code.  I use
>> gevent to run redis-py to make async calls to redis even though the client
>> in nature is blocking.  I do believe your client is pure python?
>>
>> Thanks
>>
>>
>> On Sun, Feb 17, 2013 at 1:15 PM, David Arthur <mu...@gmail.com> wrote:
>>
>>  Replying to both messages inline:
>>>
>>>
>>> On 2/16/13 9:07 PM, David Montgomery wrote:
>>>
>>>  By the way..I assume that python-kafka is gevent safe?
>>>>
>>>>  No idea, I've never used greenlet/gevent before. If the question is
>>> "are
>>> you doing anything unusual in kafka-python", then the answer is no. Just
>>> basic method calls and some socket stuff
>>>
>>>
>>>  Thanks
>>>>
>>>>
>>>> On Sun, Feb 17, 2013 at 10:04 AM, David Montgomery <
>>>> davidmontgomery@gmail.com> wrote:
>>>>
>>>>   Ok...great...I see now about offsets.  I see how I can manage on
>>>>
>>>>> restarts.  Thanks!
>>>>>
>>>>> So...considering I am in a disposable machine world then I will
>>>>> consider
>>>>> redis as a centralized store.  Makes sense?
>>>>>
>>>>>  You can certainly used Redis as a fast, in-memory queue. It is, of
>>>>
>>> course, an entirely different system with different design goals and
>>> features. The applicability of Redis or Kafka depends on your use case.
>>>
>>>
>>>  What is the time frame for v8 release?
>>>>>
>>>>>  I believe it is farily imminent, maybe sometime in March?
>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>   Greetings!
>>>>>
>>>>>> I am the maintainer of kafka-python. Very cool to see it used in the
>>>>>> wild.
>>>>>>
>>>>>> The kafka-python library supports the low-level protocols of Kafka 0.7
>>>>>> (Produce/Fetch/MultiProduce/******MultiFetch). When you ask Kafka for
>>>>>>
>>>>>>
>>>>>> messages via a Fetch request, you specify an offset + range (much like
>>>>>> reading a file). The `iter_messages` helper returns an iterator that
>>>>>> automatically handles paging offsets through successive Fetch
>>>>>> requests.
>>>>>> However, it does not support _saving_ your offsets. One of the
>>>>>> parameters
>>>>>> to iter_messages is the offset to start at, so when you re-run your
>>>>>> script
>>>>>> it will start at that point again.
>>>>>>
>>>>>> In 0.7, clients must talk to ZooKeeper in order to persist offsets in
>>>>>> a
>>>>>> Kafka-compatible way (or they could just save them locally depending
>>>>>> on
>>>>>> the
>>>>>> use case). Talking to ZooKeeper from Python is somewhat troublesome,
>>>>>> and
>>>>>> implementing the Kafka "consumer group rebalancing" is even more
>>>>>> troublesome - so I chose to omit it.
>>>>>>
>>>>>> In 0.8 (not yet released), consumer offsets are managed centrally by
>>>>>> the
>>>>>> Kafka brokers and have APIs for clients to commit and fetch offsets. I
>>>>>> am
>>>>>> in the process of implementing a 0.8 compatible version of
>>>>>> kafka-python.
>>>>>>
>>>>>> So for the time being, you are on your own with regards to offset
>>>>>> management :-/
>>>>>>
>>>>>> Cheers!
>>>>>>
>>>>>> -David
>>>>>>
>>>>>>
>>>>>> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>>>>>>
>>>>>>   You need to read the Kafka design docs. Kafka does not delete
>>>>>> messages
>>>>>>
>>>>>>> just because a Consumer reads it. It does not track what messages
>>>>>>> have
>>>>>>> been
>>>>>>> consumed by any Consumer.
>>>>>>>
>>>>>>> It is up to Consumers to start off where they left off, by always
>>>>>>> asking
>>>>>>> for the right message (via offsets).
>>>>>>>
>>>>>>> Philip
>>>>>>>
>>>>>>> On Feb 16, 2013, at 4:48 AM, David Montgomery <
>>>>>>> davidmontgomery@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>    Hi,
>>>>>>>
>>>>>>>  I have a zookeer and kafka set up.
>>>>>>>>
>>>>>>>> I am using this python client:  https://github.com/mumrah/**
>>>>>>>> kafka-python <https://github.com/mumrah/****kafka-python<https://github.com/mumrah/**kafka-python>
>>>>>>>> <https://github.**com/mumrah/kafka-python<https://github.com/mumrah/kafka-python>
>>>>>>>> >
>>>>>>>>
>>>>>>>>
>>>>>>>> I can send and receive messages but they are not deleted.
>>>>>>>>
>>>>>>>> How can I send a message to kafka and no other consumer can use it?
>>>>>>>>
>>>>>>>>
>>>>>>>> I feel I am missing something on how kafka works
>>>>>>>>
>>>>>>>> def produce():
>>>>>>>>       kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>>>       kafka.send_messages_simple("******my-topic", "some message")
>>>>>>>>
>>>>>>>>
>>>>>>>>       kafka.close()
>>>>>>>>       print 'done'
>>>>>>>>
>>>>>>>> def consume():
>>>>>>>>       kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>>>       for msg in kafka.iter_messages("my-topic"******, 0, 0,
>>>>>>>>
>>>>>>>>
>>>>>>>> 1024*1024,False):
>>>>>>>>           print(msg.payload)
>>>>>>>>       kafka.close()
>>>>>>>>       print 'done'
>>>>>>>>
>>>>>>>> Every time I ran the above...everytime I ran consume the messages
>>>>>>>> just
>>>>>>>> grew
>>>>>>>> from previous messages.
>>>>>>>>
>>>>>>>> Am I missing something on the server.properties file?
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>>
>>>>>>>>
>

Re: python and kafka - how to use as a queue

Posted by David Arthur <mu...@gmail.com>.
It is indeed pure python

On 2/17/13 12:20 AM, David Montgomery wrote:
> Key issue with gevent is there can be no C bindings.  If pure python then
> the sockets can be monkey patched as long as pure python code.  I use
> gevent to run redis-py to make async calls to redis even though the client
> in nature is blocking.  I do believe your client is pure python?
>
> Thanks
>
>
> On Sun, Feb 17, 2013 at 1:15 PM, David Arthur <mu...@gmail.com> wrote:
>
>> Replying to both messages inline:
>>
>>
>> On 2/16/13 9:07 PM, David Montgomery wrote:
>>
>>> By the way..I assume that python-kafka is gevent safe?
>>>
>> No idea, I've never used greenlet/gevent before. If the question is "are
>> you doing anything unusual in kafka-python", then the answer is no. Just
>> basic method calls and some socket stuff
>>
>>
>>> Thanks
>>>
>>>
>>> On Sun, Feb 17, 2013 at 10:04 AM, David Montgomery <
>>> davidmontgomery@gmail.com> wrote:
>>>
>>>   Ok...great...I see now about offsets.  I see how I can manage on
>>>> restarts.  Thanks!
>>>>
>>>> So...considering I am in a disposable machine world then I will consider
>>>> redis as a centralized store.  Makes sense?
>>>>
>>> You can certainly used Redis as a fast, in-memory queue. It is, of
>> course, an entirely different system with different design goals and
>> features. The applicability of Redis or Kafka depends on your use case.
>>
>>
>>>> What is the time frame for v8 release?
>>>>
>>> I believe it is farily imminent, maybe sometime in March?
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com> wrote:
>>>>
>>>>   Greetings!
>>>>> I am the maintainer of kafka-python. Very cool to see it used in the
>>>>> wild.
>>>>>
>>>>> The kafka-python library supports the low-level protocols of Kafka 0.7
>>>>> (Produce/Fetch/MultiProduce/****MultiFetch). When you ask Kafka for
>>>>>
>>>>> messages via a Fetch request, you specify an offset + range (much like
>>>>> reading a file). The `iter_messages` helper returns an iterator that
>>>>> automatically handles paging offsets through successive Fetch requests.
>>>>> However, it does not support _saving_ your offsets. One of the
>>>>> parameters
>>>>> to iter_messages is the offset to start at, so when you re-run your
>>>>> script
>>>>> it will start at that point again.
>>>>>
>>>>> In 0.7, clients must talk to ZooKeeper in order to persist offsets in a
>>>>> Kafka-compatible way (or they could just save them locally depending on
>>>>> the
>>>>> use case). Talking to ZooKeeper from Python is somewhat troublesome, and
>>>>> implementing the Kafka "consumer group rebalancing" is even more
>>>>> troublesome - so I chose to omit it.
>>>>>
>>>>> In 0.8 (not yet released), consumer offsets are managed centrally by the
>>>>> Kafka brokers and have APIs for clients to commit and fetch offsets. I
>>>>> am
>>>>> in the process of implementing a 0.8 compatible version of kafka-python.
>>>>>
>>>>> So for the time being, you are on your own with regards to offset
>>>>> management :-/
>>>>>
>>>>> Cheers!
>>>>>
>>>>> -David
>>>>>
>>>>>
>>>>> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>>>>>
>>>>>   You need to read the Kafka design docs. Kafka does not delete messages
>>>>>> just because a Consumer reads it. It does not track what messages have
>>>>>> been
>>>>>> consumed by any Consumer.
>>>>>>
>>>>>> It is up to Consumers to start off where they left off, by always
>>>>>> asking
>>>>>> for the right message (via offsets).
>>>>>>
>>>>>> Philip
>>>>>>
>>>>>> On Feb 16, 2013, at 4:48 AM, David Montgomery <
>>>>>> davidmontgomery@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>    Hi,
>>>>>>
>>>>>>> I have a zookeer and kafka set up.
>>>>>>>
>>>>>>> I am using this python client:  https://github.com/mumrah/**
>>>>>>> kafka-python <https://github.com/mumrah/**kafka-python<https://github.com/mumrah/kafka-python>
>>>>>>>
>>>>>>> I can send and receive messages but they are not deleted.
>>>>>>>
>>>>>>> How can I send a message to kafka and no other consumer can use it?
>>>>>>>
>>>>>>>
>>>>>>> I feel I am missing something on how kafka works
>>>>>>>
>>>>>>> def produce():
>>>>>>>       kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>>       kafka.send_messages_simple("****my-topic", "some message")
>>>>>>>
>>>>>>>       kafka.close()
>>>>>>>       print 'done'
>>>>>>>
>>>>>>> def consume():
>>>>>>>       kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>>       for msg in kafka.iter_messages("my-topic"****, 0, 0,
>>>>>>>
>>>>>>> 1024*1024,False):
>>>>>>>           print(msg.payload)
>>>>>>>       kafka.close()
>>>>>>>       print 'done'
>>>>>>>
>>>>>>> Every time I ran the above...everytime I ran consume the messages just
>>>>>>> grew
>>>>>>> from previous messages.
>>>>>>>
>>>>>>> Am I missing something on the server.properties file?
>>>>>>>
>>>>>>> Thanks
>>>>>>>
>>>>>>>


Re: python and kafka - how to use as a queue

Posted by David Montgomery <da...@gmail.com>.
Key issue with gevent is there can be no C bindings.  If pure python then
the sockets can be monkey patched as long as pure python code.  I use
gevent to run redis-py to make async calls to redis even though the client
in nature is blocking.  I do believe your client is pure python?

Thanks


On Sun, Feb 17, 2013 at 1:15 PM, David Arthur <mu...@gmail.com> wrote:

> Replying to both messages inline:
>
>
> On 2/16/13 9:07 PM, David Montgomery wrote:
>
>> By the way..I assume that python-kafka is gevent safe?
>>
> No idea, I've never used greenlet/gevent before. If the question is "are
> you doing anything unusual in kafka-python", then the answer is no. Just
> basic method calls and some socket stuff
>
>
>> Thanks
>>
>>
>> On Sun, Feb 17, 2013 at 10:04 AM, David Montgomery <
>> davidmontgomery@gmail.com> wrote:
>>
>>  Ok...great...I see now about offsets.  I see how I can manage on
>>> restarts.  Thanks!
>>>
>>> So...considering I am in a disposable machine world then I will consider
>>> redis as a centralized store.  Makes sense?
>>>
>> You can certainly used Redis as a fast, in-memory queue. It is, of
> course, an entirely different system with different design goals and
> features. The applicability of Redis or Kafka depends on your use case.
>
>
>>> What is the time frame for v8 release?
>>>
>> I believe it is farily imminent, maybe sometime in March?
>
>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com> wrote:
>>>
>>>  Greetings!
>>>>
>>>> I am the maintainer of kafka-python. Very cool to see it used in the
>>>> wild.
>>>>
>>>> The kafka-python library supports the low-level protocols of Kafka 0.7
>>>> (Produce/Fetch/MultiProduce/****MultiFetch). When you ask Kafka for
>>>>
>>>> messages via a Fetch request, you specify an offset + range (much like
>>>> reading a file). The `iter_messages` helper returns an iterator that
>>>> automatically handles paging offsets through successive Fetch requests.
>>>> However, it does not support _saving_ your offsets. One of the
>>>> parameters
>>>> to iter_messages is the offset to start at, so when you re-run your
>>>> script
>>>> it will start at that point again.
>>>>
>>>> In 0.7, clients must talk to ZooKeeper in order to persist offsets in a
>>>> Kafka-compatible way (or they could just save them locally depending on
>>>> the
>>>> use case). Talking to ZooKeeper from Python is somewhat troublesome, and
>>>> implementing the Kafka "consumer group rebalancing" is even more
>>>> troublesome - so I chose to omit it.
>>>>
>>>> In 0.8 (not yet released), consumer offsets are managed centrally by the
>>>> Kafka brokers and have APIs for clients to commit and fetch offsets. I
>>>> am
>>>> in the process of implementing a 0.8 compatible version of kafka-python.
>>>>
>>>> So for the time being, you are on your own with regards to offset
>>>> management :-/
>>>>
>>>> Cheers!
>>>>
>>>> -David
>>>>
>>>>
>>>> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>>>>
>>>>  You need to read the Kafka design docs. Kafka does not delete messages
>>>>> just because a Consumer reads it. It does not track what messages have
>>>>> been
>>>>> consumed by any Consumer.
>>>>>
>>>>> It is up to Consumers to start off where they left off, by always
>>>>> asking
>>>>> for the right message (via offsets).
>>>>>
>>>>> Philip
>>>>>
>>>>> On Feb 16, 2013, at 4:48 AM, David Montgomery <
>>>>> davidmontgomery@gmail.com>
>>>>> wrote:
>>>>>
>>>>>   Hi,
>>>>>
>>>>>> I have a zookeer and kafka set up.
>>>>>>
>>>>>> I am using this python client:  https://github.com/mumrah/**
>>>>>> kafka-python <https://github.com/mumrah/**kafka-python<https://github.com/mumrah/kafka-python>
>>>>>> >
>>>>>>
>>>>>>
>>>>>> I can send and receive messages but they are not deleted.
>>>>>>
>>>>>> How can I send a message to kafka and no other consumer can use it?
>>>>>>
>>>>>>
>>>>>> I feel I am missing something on how kafka works
>>>>>>
>>>>>> def produce():
>>>>>>      kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>      kafka.send_messages_simple("****my-topic", "some message")
>>>>>>
>>>>>>      kafka.close()
>>>>>>      print 'done'
>>>>>>
>>>>>> def consume():
>>>>>>      kafka = KafkaClient("xxx.xxx", 9092)
>>>>>>      for msg in kafka.iter_messages("my-topic"****, 0, 0,
>>>>>>
>>>>>> 1024*1024,False):
>>>>>>          print(msg.payload)
>>>>>>      kafka.close()
>>>>>>      print 'done'
>>>>>>
>>>>>> Every time I ran the above...everytime I ran consume the messages just
>>>>>> grew
>>>>>> from previous messages.
>>>>>>
>>>>>> Am I missing something on the server.properties file?
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>

Re: python and kafka - how to use as a queue

Posted by David Arthur <mu...@gmail.com>.
Replying to both messages inline:

On 2/16/13 9:07 PM, David Montgomery wrote:
> By the way..I assume that python-kafka is gevent safe?
No idea, I've never used greenlet/gevent before. If the question is "are 
you doing anything unusual in kafka-python", then the answer is no. Just 
basic method calls and some socket stuff
>
> Thanks
>
>
> On Sun, Feb 17, 2013 at 10:04 AM, David Montgomery <
> davidmontgomery@gmail.com> wrote:
>
>> Ok...great...I see now about offsets.  I see how I can manage on
>> restarts.  Thanks!
>>
>> So...considering I am in a disposable machine world then I will consider
>> redis as a centralized store.  Makes sense?
You can certainly used Redis as a fast, in-memory queue. It is, of 
course, an entirely different system with different design goals and 
features. The applicability of Redis or Kafka depends on your use case.
>>
>> What is the time frame for v8 release?
I believe it is farily imminent, maybe sometime in March?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com> wrote:
>>
>>> Greetings!
>>>
>>> I am the maintainer of kafka-python. Very cool to see it used in the wild.
>>>
>>> The kafka-python library supports the low-level protocols of Kafka 0.7
>>> (Produce/Fetch/MultiProduce/**MultiFetch). When you ask Kafka for
>>> messages via a Fetch request, you specify an offset + range (much like
>>> reading a file). The `iter_messages` helper returns an iterator that
>>> automatically handles paging offsets through successive Fetch requests.
>>> However, it does not support _saving_ your offsets. One of the parameters
>>> to iter_messages is the offset to start at, so when you re-run your script
>>> it will start at that point again.
>>>
>>> In 0.7, clients must talk to ZooKeeper in order to persist offsets in a
>>> Kafka-compatible way (or they could just save them locally depending on the
>>> use case). Talking to ZooKeeper from Python is somewhat troublesome, and
>>> implementing the Kafka "consumer group rebalancing" is even more
>>> troublesome - so I chose to omit it.
>>>
>>> In 0.8 (not yet released), consumer offsets are managed centrally by the
>>> Kafka brokers and have APIs for clients to commit and fetch offsets. I am
>>> in the process of implementing a 0.8 compatible version of kafka-python.
>>>
>>> So for the time being, you are on your own with regards to offset
>>> management :-/
>>>
>>> Cheers!
>>>
>>> -David
>>>
>>>
>>> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>>>
>>>> You need to read the Kafka design docs. Kafka does not delete messages
>>>> just because a Consumer reads it. It does not track what messages have been
>>>> consumed by any Consumer.
>>>>
>>>> It is up to Consumers to start off where they left off, by always asking
>>>> for the right message (via offsets).
>>>>
>>>> Philip
>>>>
>>>> On Feb 16, 2013, at 4:48 AM, David Montgomery <da...@gmail.com>
>>>> wrote:
>>>>
>>>>   Hi,
>>>>> I have a zookeer and kafka set up.
>>>>>
>>>>> I am using this python client:  https://github.com/mumrah/**
>>>>> kafka-python <https://github.com/mumrah/kafka-python>
>>>>>
>>>>> I can send and receive messages but they are not deleted.
>>>>>
>>>>> How can I send a message to kafka and no other consumer can use it?
>>>>>
>>>>>
>>>>> I feel I am missing something on how kafka works
>>>>>
>>>>> def produce():
>>>>>      kafka = KafkaClient("xxx.xxx", 9092)
>>>>>      kafka.send_messages_simple("**my-topic", "some message")
>>>>>      kafka.close()
>>>>>      print 'done'
>>>>>
>>>>> def consume():
>>>>>      kafka = KafkaClient("xxx.xxx", 9092)
>>>>>      for msg in kafka.iter_messages("my-topic"**, 0, 0,
>>>>> 1024*1024,False):
>>>>>          print(msg.payload)
>>>>>      kafka.close()
>>>>>      print 'done'
>>>>>
>>>>> Every time I ran the above...everytime I ran consume the messages just
>>>>> grew
>>>>> from previous messages.
>>>>>
>>>>> Am I missing something on the server.properties file?
>>>>>
>>>>> Thanks
>>>>>


Re: python and kafka - how to use as a queue

Posted by David Montgomery <da...@gmail.com>.
By the way..I assume that python-kafka is gevent safe?

Thanks


On Sun, Feb 17, 2013 at 10:04 AM, David Montgomery <
davidmontgomery@gmail.com> wrote:

> Ok...great...I see now about offsets.  I see how I can manage on
> restarts.  Thanks!
>
> So...considering I am in a disposable machine world then I will consider
> redis as a centralized store.  Makes sense?
>
> What is the time frame for v8 release?
>
>
>
>
>
>
>
>
>
> On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com> wrote:
>
>> Greetings!
>>
>> I am the maintainer of kafka-python. Very cool to see it used in the wild.
>>
>> The kafka-python library supports the low-level protocols of Kafka 0.7
>> (Produce/Fetch/MultiProduce/**MultiFetch). When you ask Kafka for
>> messages via a Fetch request, you specify an offset + range (much like
>> reading a file). The `iter_messages` helper returns an iterator that
>> automatically handles paging offsets through successive Fetch requests.
>> However, it does not support _saving_ your offsets. One of the parameters
>> to iter_messages is the offset to start at, so when you re-run your script
>> it will start at that point again.
>>
>> In 0.7, clients must talk to ZooKeeper in order to persist offsets in a
>> Kafka-compatible way (or they could just save them locally depending on the
>> use case). Talking to ZooKeeper from Python is somewhat troublesome, and
>> implementing the Kafka "consumer group rebalancing" is even more
>> troublesome - so I chose to omit it.
>>
>> In 0.8 (not yet released), consumer offsets are managed centrally by the
>> Kafka brokers and have APIs for clients to commit and fetch offsets. I am
>> in the process of implementing a 0.8 compatible version of kafka-python.
>>
>> So for the time being, you are on your own with regards to offset
>> management :-/
>>
>> Cheers!
>>
>> -David
>>
>>
>> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>>
>>> You need to read the Kafka design docs. Kafka does not delete messages
>>> just because a Consumer reads it. It does not track what messages have been
>>> consumed by any Consumer.
>>>
>>> It is up to Consumers to start off where they left off, by always asking
>>> for the right message (via offsets).
>>>
>>> Philip
>>>
>>> On Feb 16, 2013, at 4:48 AM, David Montgomery <da...@gmail.com>
>>> wrote:
>>>
>>>  Hi,
>>>>
>>>> I have a zookeer and kafka set up.
>>>>
>>>> I am using this python client:  https://github.com/mumrah/**
>>>> kafka-python <https://github.com/mumrah/kafka-python>
>>>>
>>>> I can send and receive messages but they are not deleted.
>>>>
>>>> How can I send a message to kafka and no other consumer can use it?
>>>>
>>>>
>>>> I feel I am missing something on how kafka works
>>>>
>>>> def produce():
>>>>     kafka = KafkaClient("xxx.xxx", 9092)
>>>>     kafka.send_messages_simple("**my-topic", "some message")
>>>>     kafka.close()
>>>>     print 'done'
>>>>
>>>> def consume():
>>>>     kafka = KafkaClient("xxx.xxx", 9092)
>>>>     for msg in kafka.iter_messages("my-topic"**, 0, 0,
>>>> 1024*1024,False):
>>>>         print(msg.payload)
>>>>     kafka.close()
>>>>     print 'done'
>>>>
>>>> Every time I ran the above...everytime I ran consume the messages just
>>>> grew
>>>> from previous messages.
>>>>
>>>> Am I missing something on the server.properties file?
>>>>
>>>> Thanks
>>>>
>>>
>>
>

Re: python and kafka - how to use as a queue

Posted by David Montgomery <da...@gmail.com>.
Ok...great...I see now about offsets.  I see how I can manage on restarts.
Thanks!

So...considering I am in a disposable machine world then I will consider
redis as a centralized store.  Makes sense?

What is the time frame for v8 release?









On Sun, Feb 17, 2013 at 3:27 AM, David Arthur <mu...@gmail.com> wrote:

> Greetings!
>
> I am the maintainer of kafka-python. Very cool to see it used in the wild.
>
> The kafka-python library supports the low-level protocols of Kafka 0.7
> (Produce/Fetch/MultiProduce/**MultiFetch). When you ask Kafka for
> messages via a Fetch request, you specify an offset + range (much like
> reading a file). The `iter_messages` helper returns an iterator that
> automatically handles paging offsets through successive Fetch requests.
> However, it does not support _saving_ your offsets. One of the parameters
> to iter_messages is the offset to start at, so when you re-run your script
> it will start at that point again.
>
> In 0.7, clients must talk to ZooKeeper in order to persist offsets in a
> Kafka-compatible way (or they could just save them locally depending on the
> use case). Talking to ZooKeeper from Python is somewhat troublesome, and
> implementing the Kafka "consumer group rebalancing" is even more
> troublesome - so I chose to omit it.
>
> In 0.8 (not yet released), consumer offsets are managed centrally by the
> Kafka brokers and have APIs for clients to commit and fetch offsets. I am
> in the process of implementing a 0.8 compatible version of kafka-python.
>
> So for the time being, you are on your own with regards to offset
> management :-/
>
> Cheers!
>
> -David
>
>
> On 2/16/13 1:35 PM, Philip O'Toole wrote:
>
>> You need to read the Kafka design docs. Kafka does not delete messages
>> just because a Consumer reads it. It does not track what messages have been
>> consumed by any Consumer.
>>
>> It is up to Consumers to start off where they left off, by always asking
>> for the right message (via offsets).
>>
>> Philip
>>
>> On Feb 16, 2013, at 4:48 AM, David Montgomery <da...@gmail.com>
>> wrote:
>>
>>  Hi,
>>>
>>> I have a zookeer and kafka set up.
>>>
>>> I am using this python client:  https://github.com/mumrah/**kafka-python<https://github.com/mumrah/kafka-python>
>>>
>>> I can send and receive messages but they are not deleted.
>>>
>>> How can I send a message to kafka and no other consumer can use it?
>>>
>>>
>>> I feel I am missing something on how kafka works
>>>
>>> def produce():
>>>     kafka = KafkaClient("xxx.xxx", 9092)
>>>     kafka.send_messages_simple("**my-topic", "some message")
>>>     kafka.close()
>>>     print 'done'
>>>
>>> def consume():
>>>     kafka = KafkaClient("xxx.xxx", 9092)
>>>     for msg in kafka.iter_messages("my-topic"**, 0, 0, 1024*1024,False):
>>>         print(msg.payload)
>>>     kafka.close()
>>>     print 'done'
>>>
>>> Every time I ran the above...everytime I ran consume the messages just
>>> grew
>>> from previous messages.
>>>
>>> Am I missing something on the server.properties file?
>>>
>>> Thanks
>>>
>>
>

Re: python and kafka - how to use as a queue

Posted by David Arthur <mu...@gmail.com>.
Greetings!

I am the maintainer of kafka-python. Very cool to see it used in the wild.

The kafka-python library supports the low-level protocols of Kafka 0.7 
(Produce/Fetch/MultiProduce/MultiFetch). When you ask Kafka for messages 
via a Fetch request, you specify an offset + range (much like reading a 
file). The `iter_messages` helper returns an iterator that automatically 
handles paging offsets through successive Fetch requests. However, it 
does not support _saving_ your offsets. One of the parameters to 
iter_messages is the offset to start at, so when you re-run your script 
it will start at that point again.

In 0.7, clients must talk to ZooKeeper in order to persist offsets in a 
Kafka-compatible way (or they could just save them locally depending on 
the use case). Talking to ZooKeeper from Python is somewhat troublesome, 
and implementing the Kafka "consumer group rebalancing" is even more 
troublesome - so I chose to omit it.

In 0.8 (not yet released), consumer offsets are managed centrally by the 
Kafka brokers and have APIs for clients to commit and fetch offsets. I 
am in the process of implementing a 0.8 compatible version of kafka-python.

So for the time being, you are on your own with regards to offset 
management :-/

Cheers!

-David

On 2/16/13 1:35 PM, Philip O'Toole wrote:
> You need to read the Kafka design docs. Kafka does not delete messages just because a Consumer reads it. It does not track what messages have been consumed by any Consumer.
>
> It is up to Consumers to start off where they left off, by always asking for the right message (via offsets).
>
> Philip
>
> On Feb 16, 2013, at 4:48 AM, David Montgomery <da...@gmail.com> wrote:
>
>> Hi,
>>
>> I have a zookeer and kafka set up.
>>
>> I am using this python client:  https://github.com/mumrah/kafka-python
>>
>> I can send and receive messages but they are not deleted.
>>
>> How can I send a message to kafka and no other consumer can use it?
>>
>>
>> I feel I am missing something on how kafka works
>>
>> def produce():
>>     kafka = KafkaClient("xxx.xxx", 9092)
>>     kafka.send_messages_simple("my-topic", "some message")
>>     kafka.close()
>>     print 'done'
>>
>> def consume():
>>     kafka = KafkaClient("xxx.xxx", 9092)
>>     for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024,False):
>>         print(msg.payload)
>>     kafka.close()
>>     print 'done'
>>
>> Every time I ran the above...everytime I ran consume the messages just grew
>> from previous messages.
>>
>> Am I missing something on the server.properties file?
>>
>> Thanks


Re: python and kafka - how to use as a queue

Posted by Philip O'Toole <ph...@loggly.com>.
You need to read the Kafka design docs. Kafka does not delete messages just because a Consumer reads it. It does not track what messages have been consumed by any Consumer. 

It is up to Consumers to start off where they left off, by always asking for the right message (via offsets). 

Philip

On Feb 16, 2013, at 4:48 AM, David Montgomery <da...@gmail.com> wrote:

> Hi,
> 
> I have a zookeer and kafka set up.
> 
> I am using this python client:  https://github.com/mumrah/kafka-python
> 
> I can send and receive messages but they are not deleted.
> 
> How can I send a message to kafka and no other consumer can use it?
> 
> 
> I feel I am missing something on how kafka works
> 
> def produce():
>    kafka = KafkaClient("xxx.xxx", 9092)
>    kafka.send_messages_simple("my-topic", "some message")
>    kafka.close()
>    print 'done'
> 
> def consume():
>    kafka = KafkaClient("xxx.xxx", 9092)
>    for msg in kafka.iter_messages("my-topic", 0, 0, 1024*1024,False):
>        print(msg.payload)
>    kafka.close()
>    print 'done'
> 
> Every time I ran the above...everytime I ran consume the messages just grew
> from previous messages.
> 
> Am I missing something on the server.properties file?
> 
> Thanks