You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Keith Wiley <ke...@atigeo.com> on 2015/07/29 05:47:02 UTC

kafka-python message offset?

I haven’t found a way to specify that a consumer should read from the beginning, or from any other explicit offset, or that the offset should be “reset” in any way.  The command-line shell scripts (which I believe simply wrap the Scala tools) have flags for this sort of thing.  Is there any way to do this through the python library?

Thanks.

Re: kafka-python message offset?

Posted by Keith Wiley <ke...@atigeo.com>.
Oh, I'm sorry.  If I use the KafkaConsumer class instead of the SimpleConsumer class (as you suggested) it works.  Frustratingly, SimpleConsumer will take the auto_offset_reset parameter without complaining that no such parameter exists, yet it doesn't work properly!  But KafkaConsumer works, so I'm in better shape now.  Thank you.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wiley@atigeo.com

Re: kafka-python message offset?

Posted by Keith Wiley <ke...@atigeo.com>.
I got it.  It has been tricky getting both consumer classes to work since they are not very similar.  I configured one incorrectly because they take the arguments in different orders (in one topic comes before group and in the other that order is reversed).  Now that it works, I can also see that the message classes they return are different, so the contents must be teased out in slightly different ways.

It's basically working now.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wiley@atigeo.com





________________________________________
From: Dana Powers <da...@rd.io>
Sent: Wednesday, July 29, 2015 10:31 AM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

Have you tried not setting a group_id in SimpleConsumer? If you have stored
offsets in ZK for that group, and those offsets still exist on the server,
the consumer will use them and not 'reset'. My hunch is that is your
problem. You might also consider enabling kafka debug logs (though not for
the faint-of-heart). Try initializing SimpleConsumer like so:

```
 s_consumer = SimpleConsumer(client, topic, None,
auto_offset_reset='smallest')
```

General thought: SimpleConsumer.seek() is a poorly constructed api. We have
not deprecated it yet, but I recommend switching to KafkaConsumer instead.
The same goes for SimpleConsumer as a whole, actually. It's api is quite
old and is maintained mostly to avoid breaking legacy installations.

Does KafkaConsumer.set_topic_partitions() work for your purposes? It will
take absolute, but not relative, offsets. Combined w/ an auto_offset_reset
policy, however, this should fulfill most use cases:

start from head: auto_offset_reset='smallest'
start from tail: auto_offset_reset='largest'
start from ZK-stored offset w/ reset to head: group_id='foo',
auto_offset_reset='smallest'
start from ZK-stored w/ reset to tail: group_id='foo',
auto_offset_reset='largest'
start from offline-stored offset w/ reset to head:
auto_offset_reset='smallest'; consumer.set_topic_partitions({('topic', 0):
1234})
start from offline-stored offset w/ reset to tail:
auto_offset_reset='largest'; consumer.set_topic_partitions({('topic', 0):
1234})

If there's another use-case here that you think should be covered, please
hop over to github.com/mumrah/kafka-python


-Dana
(kafka-python maintainer; KafkaConsumer author)


On Wed, Jul 29, 2015 at 9:38 AM, Keith Wiley <ke...@atigeo.com> wrote:

> Thanks.  I got it to work if I use KafkaConsumer.  I doesn't yet work with
> SimpleConsumer, and that includes seeking to 0,0.  I'm curious what that
> isn't getting it going.  It's frustrating because SimpleConsumer supports
> seek while KafkaConsumer doesn't offer a seek function, but at the same
> time, I can reset KafkaConsumer to the beginning with auto_offset_reset
> while SimpleConsumer has yet to work for me any way at all, including with
> a seek.  So far, neither class is optimal for me (SimpleConsumer doesn't
> work at all yet and KafkaConsumer has no seek function).
>
> I'm using kafka-python 0.9.4 btw, just whatever version came up when pip
> installed it.
>
>
> Keith Wiley
> Senior Software Engineer, Atigeo
> keith.wiley@atigeo.com
>
>
>
>
>
> ________________________________________
> From: Steve Miller <st...@idrathernotsay.com>
> Sent: Wednesday, July 29, 2015 09:33 AM
> To: users@kafka.apache.org
> Subject: Re: kafka-python message offset?
>
>    Are you using mumrah/kafka-python?  I think so from context but I know
> there's at least one other implementation rattling around these days. (-:
>
>    If that's what you're using, I can see two potential problems you might
> be having.  You can set the offset to some approximation of wherever you
> want, by using :
>
>         s_consumer.seek(offset, whence)
>
> "pydoc kafka.consumer" says:
>
>      |  seek(self, offset, whence)
>      |      Alter the current offset in the consumer, similar to fseek
>      |
>      |      offset: how much to modify the offset
>      |      whence: where to modify it from
>      |              0 is relative to the earliest available offset (head)
>      |              1 is relative to the current offset
>      |              2 is relative to the latest known offset (tail)
>
>    So if you want to go to the beginning, do seek(0, 0).
>
>    Seeking to the beginning or the end should be pretty reliable.  You can
> seek (say) 100K messages relative to the beginning or the end or to the
> current offset, but with partitions and message arrival order and the like
> it's a bit of a crapshoot where you'll end up.  The API will divide your
> offset by the number of partitions, then (I believe) apply that delta to
> each partition.  Hoping that the input stream is relatively well balanced.
>
>    But again, seek(0, 0) or seek(0, 2) has always worked reasonably for
> me, so much so that I don't remember the default.
>
>    The other thing you might be running into is that if you are setting
> the partitions parameter to an array containing only zero, and your topic
> has more partitions than just partition #0, the producer might be
> publishing to a different partition.  But you've told the client to read
> only from partition 0, so in that case you'd see no data.  If you want to
> consume from every partition, don't pass in a partitions parameter.
>
>         -Steve
>
> On Wed, Jul 29, 2015 at 04:07:33PM +0000, Keith Wiley wrote:
> > I'm still not getting the necessary behavior.  If I run on the command
> line, I get a series of messages:
> >
> > --------
> > $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test
> --from-beginning
> > Test
> > test
> > tests
> > asdf
> > --------
> >
> > If I exclude the --from-beginning argument then it hangs, which
> indicates to me that the offset is currently at the end and awaiting new
> messages. If I run through python, it also hangs, which is why I suspect it
> is insistently reading from the end.  See below:
> >
> > --------
> >     print "Begin constructing SimpleConsumer"
> >     client = KafkaClient(servers)
> >     s_consumer = SimpleConsumer(client,
> >                                 topic,
> >                                 group_id,
> >                                 partitions=[0], #Not sure I need this
> >                                 auto_commit=False, #Tried with and
> without, doesn't fix the problem
> >                                 auto_offset_reset='smallest' #Tried with
> and without, doesn't fix the problem
> >                                 )
> >     print "End constructing SimpleConsumer\n"
> >
> >     print "Begin reading messages"
> >     try:
> >         for message in s_consumer:
> >             print "  New message"
> >             print "  " + message.topic
> >             print "  " + message.partition
> >             print "  " + message.offset
> >             print "  " + message.key
> >             print "  " + message.value
> >     except Exception as e:
> >         print "Exception: ", e
> >     print "End reading messages\n"
> >
> >     print "End all"
> > --------
> >
> > Output:
> >
> > Begin all
> >
> > Begin constructing SimpleConsumer
> > End constructing SimpleConsumer
> >
> > Begin reading messages
> >
> > --------
> > It just hangs after that.  I also tried with a KafkaConsumer instead of
> a SimpleConsumer and it does exactly the same thing.  I'm not sure what to
> do.
> >
> >
> ----------------------------------------------------------------------------------------
> > Keith Wiley
> > Senior Software Engineer, Atigeo
> > keith.wiley@atigeo.com
> >
> >
> >
> >
> >
> > ________________________________________
> > From: Dana Powers <da...@rd.io>
> > Sent: Tuesday, July 28, 2015 09:58 PM
> > To: users@kafka.apache.org
> > Subject: Re: kafka-python message offset?
> >
> > Hi Keith,
> >
> > you can use the `auto_offset_reset` parameter to kafka-python's
> > KafkaConsumer. It behaves the same as the java consumer configuration of
> > the same name. See
> >
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
> > for more details on how to configure a KafkaConsumer instance.
> >
> > For fine-grained control wrt configuring topic/partition offsets, use
> > KafkaConsumer.set_topic_partitions() . For the most control, pass a
> > dictionary of {(topic, partition): offset, ...} .
> > see
> >
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
> >
> > -Dana
> >
> > On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com>
> wrote:
> >
> > > I haven?t found a way to specify that a consumer should read from the
> > > beginning, or from any other explicit offset, or that the offset
> should be
> > > ?reset? in any way.  The command-line shell scripts (which I believe
> simply
> > > wrap the Scala tools) have flags for this sort of thing.  Is there any
> way
> > > to do this through the python library?
> > >
> > > Thanks.
>

Re: kafka-python message offset?

Posted by Dana Powers <da...@rd.io>.
Have you tried not setting a group_id in SimpleConsumer? If you have stored
offsets in ZK for that group, and those offsets still exist on the server,
the consumer will use them and not 'reset'. My hunch is that is your
problem. You might also consider enabling kafka debug logs (though not for
the faint-of-heart). Try initializing SimpleConsumer like so:

```
 s_consumer = SimpleConsumer(client, topic, None,
auto_offset_reset='smallest')
```

General thought: SimpleConsumer.seek() is a poorly constructed api. We have
not deprecated it yet, but I recommend switching to KafkaConsumer instead.
The same goes for SimpleConsumer as a whole, actually. It's api is quite
old and is maintained mostly to avoid breaking legacy installations.

Does KafkaConsumer.set_topic_partitions() work for your purposes? It will
take absolute, but not relative, offsets. Combined w/ an auto_offset_reset
policy, however, this should fulfill most use cases:

start from head: auto_offset_reset='smallest'
start from tail: auto_offset_reset='largest'
start from ZK-stored offset w/ reset to head: group_id='foo',
auto_offset_reset='smallest'
start from ZK-stored w/ reset to tail: group_id='foo',
auto_offset_reset='largest'
start from offline-stored offset w/ reset to head:
auto_offset_reset='smallest'; consumer.set_topic_partitions({('topic', 0):
1234})
start from offline-stored offset w/ reset to tail:
auto_offset_reset='largest'; consumer.set_topic_partitions({('topic', 0):
1234})

If there's another use-case here that you think should be covered, please
hop over to github.com/mumrah/kafka-python


-Dana
(kafka-python maintainer; KafkaConsumer author)


On Wed, Jul 29, 2015 at 9:38 AM, Keith Wiley <ke...@atigeo.com> wrote:

> Thanks.  I got it to work if I use KafkaConsumer.  I doesn't yet work with
> SimpleConsumer, and that includes seeking to 0,0.  I'm curious what that
> isn't getting it going.  It's frustrating because SimpleConsumer supports
> seek while KafkaConsumer doesn't offer a seek function, but at the same
> time, I can reset KafkaConsumer to the beginning with auto_offset_reset
> while SimpleConsumer has yet to work for me any way at all, including with
> a seek.  So far, neither class is optimal for me (SimpleConsumer doesn't
> work at all yet and KafkaConsumer has no seek function).
>
> I'm using kafka-python 0.9.4 btw, just whatever version came up when pip
> installed it.
>
>
> Keith Wiley
> Senior Software Engineer, Atigeo
> keith.wiley@atigeo.com
>
>
>
>
>
> ________________________________________
> From: Steve Miller <st...@idrathernotsay.com>
> Sent: Wednesday, July 29, 2015 09:33 AM
> To: users@kafka.apache.org
> Subject: Re: kafka-python message offset?
>
>    Are you using mumrah/kafka-python?  I think so from context but I know
> there's at least one other implementation rattling around these days. (-:
>
>    If that's what you're using, I can see two potential problems you might
> be having.  You can set the offset to some approximation of wherever you
> want, by using :
>
>         s_consumer.seek(offset, whence)
>
> "pydoc kafka.consumer" says:
>
>      |  seek(self, offset, whence)
>      |      Alter the current offset in the consumer, similar to fseek
>      |
>      |      offset: how much to modify the offset
>      |      whence: where to modify it from
>      |              0 is relative to the earliest available offset (head)
>      |              1 is relative to the current offset
>      |              2 is relative to the latest known offset (tail)
>
>    So if you want to go to the beginning, do seek(0, 0).
>
>    Seeking to the beginning or the end should be pretty reliable.  You can
> seek (say) 100K messages relative to the beginning or the end or to the
> current offset, but with partitions and message arrival order and the like
> it's a bit of a crapshoot where you'll end up.  The API will divide your
> offset by the number of partitions, then (I believe) apply that delta to
> each partition.  Hoping that the input stream is relatively well balanced.
>
>    But again, seek(0, 0) or seek(0, 2) has always worked reasonably for
> me, so much so that I don't remember the default.
>
>    The other thing you might be running into is that if you are setting
> the partitions parameter to an array containing only zero, and your topic
> has more partitions than just partition #0, the producer might be
> publishing to a different partition.  But you've told the client to read
> only from partition 0, so in that case you'd see no data.  If you want to
> consume from every partition, don't pass in a partitions parameter.
>
>         -Steve
>
> On Wed, Jul 29, 2015 at 04:07:33PM +0000, Keith Wiley wrote:
> > I'm still not getting the necessary behavior.  If I run on the command
> line, I get a series of messages:
> >
> > --------
> > $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test
> --from-beginning
> > Test
> > test
> > tests
> > asdf
> > --------
> >
> > If I exclude the --from-beginning argument then it hangs, which
> indicates to me that the offset is currently at the end and awaiting new
> messages. If I run through python, it also hangs, which is why I suspect it
> is insistently reading from the end.  See below:
> >
> > --------
> >     print "Begin constructing SimpleConsumer"
> >     client = KafkaClient(servers)
> >     s_consumer = SimpleConsumer(client,
> >                                 topic,
> >                                 group_id,
> >                                 partitions=[0], #Not sure I need this
> >                                 auto_commit=False, #Tried with and
> without, doesn't fix the problem
> >                                 auto_offset_reset='smallest' #Tried with
> and without, doesn't fix the problem
> >                                 )
> >     print "End constructing SimpleConsumer\n"
> >
> >     print "Begin reading messages"
> >     try:
> >         for message in s_consumer:
> >             print "  New message"
> >             print "  " + message.topic
> >             print "  " + message.partition
> >             print "  " + message.offset
> >             print "  " + message.key
> >             print "  " + message.value
> >     except Exception as e:
> >         print "Exception: ", e
> >     print "End reading messages\n"
> >
> >     print "End all"
> > --------
> >
> > Output:
> >
> > Begin all
> >
> > Begin constructing SimpleConsumer
> > End constructing SimpleConsumer
> >
> > Begin reading messages
> >
> > --------
> > It just hangs after that.  I also tried with a KafkaConsumer instead of
> a SimpleConsumer and it does exactly the same thing.  I'm not sure what to
> do.
> >
> >
> ----------------------------------------------------------------------------------------
> > Keith Wiley
> > Senior Software Engineer, Atigeo
> > keith.wiley@atigeo.com
> >
> >
> >
> >
> >
> > ________________________________________
> > From: Dana Powers <da...@rd.io>
> > Sent: Tuesday, July 28, 2015 09:58 PM
> > To: users@kafka.apache.org
> > Subject: Re: kafka-python message offset?
> >
> > Hi Keith,
> >
> > you can use the `auto_offset_reset` parameter to kafka-python's
> > KafkaConsumer. It behaves the same as the java consumer configuration of
> > the same name. See
> >
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
> > for more details on how to configure a KafkaConsumer instance.
> >
> > For fine-grained control wrt configuring topic/partition offsets, use
> > KafkaConsumer.set_topic_partitions() . For the most control, pass a
> > dictionary of {(topic, partition): offset, ...} .
> > see
> >
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
> >
> > -Dana
> >
> > On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com>
> wrote:
> >
> > > I haven?t found a way to specify that a consumer should read from the
> > > beginning, or from any other explicit offset, or that the offset
> should be
> > > ?reset? in any way.  The command-line shell scripts (which I believe
> simply
> > > wrap the Scala tools) have flags for this sort of thing.  Is there any
> way
> > > to do this through the python library?
> > >
> > > Thanks.
>

Re: kafka-python message offset?

Posted by Keith Wiley <ke...@atigeo.com>.
Thanks.  I got it to work if I use KafkaConsumer.  I doesn't yet work with SimpleConsumer, and that includes seeking to 0,0.  I'm curious what that isn't getting it going.  It's frustrating because SimpleConsumer supports seek while KafkaConsumer doesn't offer a seek function, but at the same time, I can reset KafkaConsumer to the beginning with auto_offset_reset while SimpleConsumer has yet to work for me any way at all, including with a seek.  So far, neither class is optimal for me (SimpleConsumer doesn't work at all yet and KafkaConsumer has no seek function).

I'm using kafka-python 0.9.4 btw, just whatever version came up when pip installed it.


Keith Wiley
Senior Software Engineer, Atigeo
keith.wiley@atigeo.com





________________________________________
From: Steve Miller <st...@idrathernotsay.com>
Sent: Wednesday, July 29, 2015 09:33 AM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

   Are you using mumrah/kafka-python?  I think so from context but I know there's at least one other implementation rattling around these days. (-:

   If that's what you're using, I can see two potential problems you might be having.  You can set the offset to some approximation of wherever you want, by using :

        s_consumer.seek(offset, whence)

"pydoc kafka.consumer" says:

     |  seek(self, offset, whence)
     |      Alter the current offset in the consumer, similar to fseek
     |
     |      offset: how much to modify the offset
     |      whence: where to modify it from
     |              0 is relative to the earliest available offset (head)
     |              1 is relative to the current offset
     |              2 is relative to the latest known offset (tail)

   So if you want to go to the beginning, do seek(0, 0).

   Seeking to the beginning or the end should be pretty reliable.  You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up.  The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition.  Hoping that the input stream is relatively well balanced.

   But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default.

   The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just partition #0, the producer might be publishing to a different partition.  But you've told the client to read only from partition 0, so in that case you'd see no data.  If you want to consume from every partition, don't pass in a partitions parameter.

        -Steve

On Wed, Jul 29, 2015 at 04:07:33PM +0000, Keith Wiley wrote:
> I'm still not getting the necessary behavior.  If I run on the command line, I get a series of messages:
>
> --------
> $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning
> Test
> test
> tests
> asdf
> --------
>
> If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end.  See below:
>
> --------
>     print "Begin constructing SimpleConsumer"
>     client = KafkaClient(servers)
>     s_consumer = SimpleConsumer(client,
>                                 topic,
>                                 group_id,
>                                 partitions=[0], #Not sure I need this
>                                 auto_commit=False, #Tried with and without, doesn't fix the problem
>                                 auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem
>                                 )
>     print "End constructing SimpleConsumer\n"
>
>     print "Begin reading messages"
>     try:
>         for message in s_consumer:
>             print "  New message"
>             print "  " + message.topic
>             print "  " + message.partition
>             print "  " + message.offset
>             print "  " + message.key
>             print "  " + message.value
>     except Exception as e:
>         print "Exception: ", e
>     print "End reading messages\n"
>
>     print "End all"
> --------
>
> Output:
>
> Begin all
>
> Begin constructing SimpleConsumer
> End constructing SimpleConsumer
>
> Begin reading messages
>
> --------
> It just hangs after that.  I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.
>
> ----------------------------------------------------------------------------------------
> Keith Wiley
> Senior Software Engineer, Atigeo
> keith.wiley@atigeo.com
>
>
>
>
>
> ________________________________________
> From: Dana Powers <da...@rd.io>
> Sent: Tuesday, July 28, 2015 09:58 PM
> To: users@kafka.apache.org
> Subject: Re: kafka-python message offset?
>
> Hi Keith,
>
> you can use the `auto_offset_reset` parameter to kafka-python's
> KafkaConsumer. It behaves the same as the java consumer configuration of
> the same name. See
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
> for more details on how to configure a KafkaConsumer instance.
>
> For fine-grained control wrt configuring topic/partition offsets, use
> KafkaConsumer.set_topic_partitions() . For the most control, pass a
> dictionary of {(topic, partition): offset, ...} .
> see
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
>
> -Dana
>
> On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com> wrote:
>
> > I haven?t found a way to specify that a consumer should read from the
> > beginning, or from any other explicit offset, or that the offset should be
> > ?reset? in any way.  The command-line shell scripts (which I believe simply
> > wrap the Scala tools) have flags for this sort of thing.  Is there any way
> > to do this through the python library?
> >
> > Thanks.

Re: kafka-python message offset?

Posted by Steve Miller <st...@idrathernotsay.com>.
   Are you using mumrah/kafka-python?  I think so from context but I know there's at least one other implementation rattling around these days. (-:

   If that's what you're using, I can see two potential problems you might be having.  You can set the offset to some approximation of wherever you want, by using :

	s_consumer.seek(offset, whence)

"pydoc kafka.consumer" says:

     |  seek(self, offset, whence)
     |      Alter the current offset in the consumer, similar to fseek
     |      
     |      offset: how much to modify the offset
     |      whence: where to modify it from
     |              0 is relative to the earliest available offset (head)
     |              1 is relative to the current offset
     |              2 is relative to the latest known offset (tail)

   So if you want to go to the beginning, do seek(0, 0).

   Seeking to the beginning or the end should be pretty reliable.  You can seek (say) 100K messages relative to the beginning or the end or to the current offset, but with partitions and message arrival order and the like it's a bit of a crapshoot where you'll end up.  The API will divide your offset by the number of partitions, then (I believe) apply that delta to each partition.  Hoping that the input stream is relatively well balanced.

   But again, seek(0, 0) or seek(0, 2) has always worked reasonably for me, so much so that I don't remember the default.

   The other thing you might be running into is that if you are setting the partitions parameter to an array containing only zero, and your topic has more partitions than just partition #0, the producer might be publishing to a different partition.  But you've told the client to read only from partition 0, so in that case you'd see no data.  If you want to consume from every partition, don't pass in a partitions parameter.

	-Steve

On Wed, Jul 29, 2015 at 04:07:33PM +0000, Keith Wiley wrote:
> I'm still not getting the necessary behavior.  If I run on the command line, I get a series of messages:
> 
> --------
> $ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning
> Test
> test
> tests
> asdf
> --------
> 
> If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end.  See below:
> 
> --------
>     print "Begin constructing SimpleConsumer"
>     client = KafkaClient(servers)
>     s_consumer = SimpleConsumer(client,
>                                 topic,
>                                 group_id,
>                                 partitions=[0], #Not sure I need this
>                                 auto_commit=False, #Tried with and without, doesn't fix the problem
>                                 auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem
>                                 )
>     print "End constructing SimpleConsumer\n"
> 
>     print "Begin reading messages"
>     try:
>         for message in s_consumer:
>             print "  New message"
>             print "  " + message.topic
>             print "  " + message.partition
>             print "  " + message.offset
>             print "  " + message.key
>             print "  " + message.value
>     except Exception as e:
>         print "Exception: ", e
>     print "End reading messages\n"
> 
>     print "End all"
> --------
> 
> Output:
> 
> Begin all
> 
> Begin constructing SimpleConsumer
> End constructing SimpleConsumer
> 
> Begin reading messages
> 
> --------
> It just hangs after that.  I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.
> 
> ----------------------------------------------------------------------------------------
> Keith Wiley
> Senior Software Engineer, Atigeo
> keith.wiley@atigeo.com
> 
> 
> 
> 
> 
> ________________________________________
> From: Dana Powers <da...@rd.io>
> Sent: Tuesday, July 28, 2015 09:58 PM
> To: users@kafka.apache.org
> Subject: Re: kafka-python message offset?
> 
> Hi Keith,
> 
> you can use the `auto_offset_reset` parameter to kafka-python's
> KafkaConsumer. It behaves the same as the java consumer configuration of
> the same name. See
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
> for more details on how to configure a KafkaConsumer instance.
> 
> For fine-grained control wrt configuring topic/partition offsets, use
> KafkaConsumer.set_topic_partitions() . For the most control, pass a
> dictionary of {(topic, partition): offset, ...} .
> see
> http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions
> 
> -Dana
> 
> On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com> wrote:
> 
> > I haven?t found a way to specify that a consumer should read from the
> > beginning, or from any other explicit offset, or that the offset should be
> > ?reset? in any way.  The command-line shell scripts (which I believe simply
> > wrap the Scala tools) have flags for this sort of thing.  Is there any way
> > to do this through the python library?
> >
> > Thanks.

Re: kafka-python message offset?

Posted by Keith Wiley <ke...@atigeo.com>.
I'm still not getting the necessary behavior.  If I run on the command line, I get a series of messages:

--------
$ ./kafka-console-consumer.sh --zookeeper w.x.y.z:p --topic test --from-beginning
Test
test
tests
asdf
--------

If I exclude the --from-beginning argument then it hangs, which indicates to me that the offset is currently at the end and awaiting new messages. If I run through python, it also hangs, which is why I suspect it is insistently reading from the end.  See below:

--------
    print "Begin constructing SimpleConsumer"
    client = KafkaClient(servers)
    s_consumer = SimpleConsumer(client,
                                topic,
                                group_id,
                                partitions=[0], #Not sure I need this
                                auto_commit=False, #Tried with and without, doesn't fix the problem
                                auto_offset_reset='smallest' #Tried with and without, doesn't fix the problem
                                )
    print "End constructing SimpleConsumer\n"

    print "Begin reading messages"
    try:
        for message in s_consumer:
            print "  New message"
            print "  " + message.topic
            print "  " + message.partition
            print "  " + message.offset
            print "  " + message.key
            print "  " + message.value
    except Exception as e:
        print "Exception: ", e
    print "End reading messages\n"

    print "End all"
--------

Output:

Begin all

Begin constructing SimpleConsumer
End constructing SimpleConsumer

Begin reading messages

--------
It just hangs after that.  I also tried with a KafkaConsumer instead of a SimpleConsumer and it does exactly the same thing.  I'm not sure what to do.

----------------------------------------------------------------------------------------
Keith Wiley
Senior Software Engineer, Atigeo
keith.wiley@atigeo.com





________________________________________
From: Dana Powers <da...@rd.io>
Sent: Tuesday, July 28, 2015 09:58 PM
To: users@kafka.apache.org
Subject: Re: kafka-python message offset?

Hi Keith,

you can use the `auto_offset_reset` parameter to kafka-python's
KafkaConsumer. It behaves the same as the java consumer configuration of
the same name. See
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
for more details on how to configure a KafkaConsumer instance.

For fine-grained control wrt configuring topic/partition offsets, use
KafkaConsumer.set_topic_partitions() . For the most control, pass a
dictionary of {(topic, partition): offset, ...} .
see
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions

-Dana

On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com> wrote:

> I haven’t found a way to specify that a consumer should read from the
> beginning, or from any other explicit offset, or that the offset should be
> “reset” in any way.  The command-line shell scripts (which I believe simply
> wrap the Scala tools) have flags for this sort of thing.  Is there any way
> to do this through the python library?
>
> Thanks.

Re: kafka-python message offset?

Posted by Dana Powers <da...@rd.io>.
Hi Keith,

you can use the `auto_offset_reset` parameter to kafka-python's
KafkaConsumer. It behaves the same as the java consumer configuration of
the same name. See
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.KafkaConsumer.configure
for more details on how to configure a KafkaConsumer instance.

For fine-grained control wrt configuring topic/partition offsets, use
KafkaConsumer.set_topic_partitions() . For the most control, pass a
dictionary of {(topic, partition): offset, ...} .
see
http://kafka-python.readthedocs.org/en/latest/apidoc/kafka.consumer.html#kafka.consumer.kafka.KafkaConsumer.set_topic_partitions

-Dana

On Tue, Jul 28, 2015 at 8:47 PM, Keith Wiley <ke...@atigeo.com> wrote:

> I haven’t found a way to specify that a consumer should read from the
> beginning, or from any other explicit offset, or that the offset should be
> “reset” in any way.  The command-line shell scripts (which I believe simply
> wrap the Scala tools) have flags for this sort of thing.  Is there any way
> to do this through the python library?
>
> Thanks.