You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Emmett Butler <em...@parsely.com> on 2018/06/19 21:42:31 UTC

Re: [pykafka-user] Re: Timestamp based reset_offset fails with OffsetOutOfRangeError

Thanks! I ask because I think it's possible that only having a single log
segment (as your partition does) hamstrings the functionality of
reset_offsets(). I haven't verified this experimentally, but I think it's
possible. Maybe someone on the Kafka users group has insight.

On Tue, Jun 19, 2018 at 1:55 AM, Moe <vo...@gmail.com> wrote:

> Thanks for getting back to me so speedily. To be frank, I do not know how
> log segments differ from logs.
>
> *# This is a view into my logs folder:*
>
>
> *# And this is the view into that folder for my offset_retrieval_test.001
> partition (the topic in question):*
>
>
> Please let me know if this isn't what you were asking for! The topic has
> been auto created so all settings are default (7 days retention if my
> memory serves me right, but I'm sure you know that better than me).
>
> I appreciate the time and effort,
> Moritz
>
> 2018-06-18 18:27 GMT+02:00 Emmett Butler <em...@parsely.com>:
>
>> How many log segments does this partition have, and what are the
>> retention settings for the topic? You can find the log segments by looking
>> in the directory pointed to by the log.dirs server configuration.
>>
>> On Mon, Jun 18, 2018 at 7:43 AM, Moe <vo...@gmail.com> wrote:
>>
>>> *# Sorry, I should have mentioned that the timestamp I use is  one I
>>> read from the server gui*
>>>
>>>
>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe:
>>>>
>>>> *Hi,*
>>>>
>>>>
>>>> *I'm struggling to implement a time(stamp) based reset_offset and am
>>>> hoping someone here can show me the light. In essence when I call
>>>> reset_offset with a timestamp I get *
>>>>
>>>> *"OffsetOutOfRangeError" *
>>>>
>>>>
>>>> *# Setup:*
>>>> - Kafka:
>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker
>>>> (Wurstmeister/Kafka image)
>>>>
>>>> - Consumer/Producer:
>>>> Ubuntu17.04
>>>> pykafka2.7.0
>>>> python3.6
>>>>
>>>>
>>>> *# Kafka Connection*
>>>> client = KafkaClient(hosts='10.25.42.127:9092', broker_version="1.0.0")
>>>> topic = client.topics['offset_retrieval_test.001'.encode()]
>>>>
>>>> *# Producer*
>>>> delivery_reports=False
>>>> auto_start=True
>>>> linger_ms=10.0
>>>>
>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_
>>>> ms=linger_ms,auto_start=auto_start)
>>>>
>>>> for i in range(100):
>>>>
>>>>     dict_obj = {
>>>>         'time': time.time(),
>>>>         'value': i
>>>>     }
>>>>
>>>>     data = json.dumps(dict_obj)
>>>>
>>>>     ksp.produce(data.encode())
>>>>     time.sleep(0.25)
>>>>
>>>> *# This results in data being written to my Kafka Server as follows:*
>>>>
>>>>
>>>> *# But when I call reset offset such as:*
>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22',
>>>> '%Y-%m-%d %H:%M:%S'))*1000)
>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)])
>>>>
>>>> while True:
>>>>     message = consumer.consume()
>>>>     dict_obj = json.loads(message.value)
>>>>
>>>>     print(dict_obj)
>>>>
>>>> *# I get the below error and a reset to 0*
>>>>
>>>>
>>>> *# Potentially related; printing the message timestamp (as below)
>>>> results in 0 being printed*
>>>>
>>>> while True:
>>>>     message = consumer.consume()
>>>>     dict_obj = json.loads(message.value)
>>>>
>>>>     print(message.timestamp)
>>>>
>>>>
>>>> *Thanks a million,*
>>>> Moritz
>>>>
>>>>
>>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "pykafka-user" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to pykafka-user+unsubscribe@googlegroups.com.
>>> To post to this group, send email to pykafka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com
>>> <https://groups.google.com/d/msgid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>>
>> --
>> Emmett Butler | Senior Software Engineer
>>
>> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>>
>> --
>> You received this message because you are subscribed to the Google Groups
>> "pykafka-user" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to pykafka-user+unsubscribe@googlegroups.com.
>> To post to this group, send email to pykafka-user@googlegroups.com.
>> Visit this group at https://groups.google.com/group/pykafka-user.
>> To view this discussion on the web visit https://groups.google.com/d/ms
>> gid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39
>> %3DEit0yDA%40mail.gmail.com
>> <https://groups.google.com/d/msgid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39%3DEit0yDA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>> .
>>
>> For more options, visit https://groups.google.com/d/optout.
>>
>
>


-- 
Emmett Butler | Senior Software Engineer
<http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>

Re: [pykafka-user] Re: Timestamp based reset_offset fails with OffsetOutOfRangeError

Posted by Moe <vo...@gmail.com>.
Thanks, I´ll do some additional digging. Not sure if it´s relevant, but for
the record numerical offset based resetting works like a charm.

2018-06-19 23:42 GMT+02:00 Emmett Butler <em...@parsely.com>:

> Thanks! I ask because I think it's possible that only having a single log
> segment (as your partition does) hamstrings the functionality of
> reset_offsets(). I haven't verified this experimentally, but I think it's
> possible. Maybe someone on the Kafka users group has insight.
>
> On Tue, Jun 19, 2018 at 1:55 AM, Moe <vo...@gmail.com> wrote:
>
>> Thanks for getting back to me so speedily. To be frank, I do not know how
>> log segments differ from logs.
>>
>> *# This is a view into my logs folder:*
>>
>>
>> *# And this is the view into that folder for my offset_retrieval_test.001
>> partition (the topic in question):*
>>
>>
>> Please let me know if this isn't what you were asking for! The topic has
>> been auto created so all settings are default (7 days retention if my
>> memory serves me right, but I'm sure you know that better than me).
>>
>> I appreciate the time and effort,
>> Moritz
>>
>> 2018-06-18 18:27 GMT+02:00 Emmett Butler <em...@parsely.com>:
>>
>>> How many log segments does this partition have, and what are the
>>> retention settings for the topic? You can find the log segments by looking
>>> in the directory pointed to by the log.dirs server configuration.
>>>
>>> On Mon, Jun 18, 2018 at 7:43 AM, Moe <vo...@gmail.com> wrote:
>>>
>>>> *# Sorry, I should have mentioned that the timestamp I use is  one I
>>>> read from the server gui*
>>>>
>>>>
>>>> Am Montag, 18. Juni 2018 16:39:50 UTC+2 schrieb Moe:
>>>>>
>>>>> *Hi,*
>>>>>
>>>>>
>>>>> *I'm struggling to implement a time(stamp) based reset_offset and am
>>>>> hoping someone here can show me the light. In essence when I call
>>>>> reset_offset with a timestamp I get *
>>>>>
>>>>> *"OffsetOutOfRangeError" *
>>>>>
>>>>>
>>>>> *# Setup:*
>>>>> - Kafka:
>>>>> Kafka==1.1.0 (also tried with 1.0.0) running in Docker
>>>>> (Wurstmeister/Kafka image)
>>>>>
>>>>> - Consumer/Producer:
>>>>> Ubuntu17.04
>>>>> pykafka2.7.0
>>>>> python3.6
>>>>>
>>>>>
>>>>> *# Kafka Connection*
>>>>> client = KafkaClient(hosts='10.25.42.127:9092',
>>>>> broker_version="1.0.0")
>>>>> topic = client.topics['offset_retrieval_test.001'.encode()]
>>>>>
>>>>> *# Producer*
>>>>> delivery_reports=False
>>>>> auto_start=True
>>>>> linger_ms=10.0
>>>>>
>>>>> ksp = topic.get_producer(delivery_reports=delivery_reports,linger_
>>>>> ms=linger_ms,auto_start=auto_start)
>>>>>
>>>>> for i in range(100):
>>>>>
>>>>>     dict_obj = {
>>>>>         'time': time.time(),
>>>>>         'value': i
>>>>>     }
>>>>>
>>>>>     data = json.dumps(dict_obj)
>>>>>
>>>>>     ksp.produce(data.encode())
>>>>>     time.sleep(0.25)
>>>>>
>>>>> *# This results in data being written to my Kafka Server as follows:*
>>>>>
>>>>>
>>>>> *# But when I call reset offset such as:*
>>>>> offset_ts = int(time.mktime(time.strptime('2018-06-18 15:33:22',
>>>>> '%Y-%m-%d %H:%M:%S'))*1000)
>>>>> consumer.reset_offsets([(topic.partitions[0],offset_ts)])
>>>>>
>>>>> while True:
>>>>>     message = consumer.consume()
>>>>>     dict_obj = json.loads(message.value)
>>>>>
>>>>>     print(dict_obj)
>>>>>
>>>>> *# I get the below error and a reset to 0*
>>>>>
>>>>>
>>>>> *# Potentially related; printing the message timestamp (as below)
>>>>> results in 0 being printed*
>>>>>
>>>>> while True:
>>>>>     message = consumer.consume()
>>>>>     dict_obj = json.loads(message.value)
>>>>>
>>>>>     print(message.timestamp)
>>>>>
>>>>>
>>>>> *Thanks a million,*
>>>>> Moritz
>>>>>
>>>>>
>>>>> --
>>>> You received this message because you are subscribed to the Google
>>>> Groups "pykafka-user" group.
>>>> To unsubscribe from this group and stop receiving emails from it, send
>>>> an email to pykafka-user+unsubscribe@googlegroups.com.
>>>> To post to this group, send email to pykafka-user@googlegroups.com.
>>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>>> gid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40goog
>>>> legroups.com
>>>> <https://groups.google.com/d/msgid/pykafka-user/77c980bb-d3e8-49f9-bbe8-358566ce797b%40googlegroups.com?utm_medium=email&utm_source=footer>
>>>> .
>>>>
>>>> For more options, visit https://groups.google.com/d/optout.
>>>>
>>>
>>>
>>>
>>> --
>>> Emmett Butler | Senior Software Engineer
>>>
>>> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>>>
>>> --
>>> You received this message because you are subscribed to the Google
>>> Groups "pykafka-user" group.
>>> To unsubscribe from this group and stop receiving emails from it, send
>>> an email to pykafka-user+unsubscribe@googlegroups.com.
>>> To post to this group, send email to pykafka-user@googlegroups.com.
>>> Visit this group at https://groups.google.com/group/pykafka-user.
>>> To view this discussion on the web visit https://groups.google.com/d/ms
>>> gid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39
>>> %3DEit0yDA%40mail.gmail.com
>>> <https://groups.google.com/d/msgid/pykafka-user/CAER3s9S5uQBO-R6bkkvfFWkRggZxa8zh07umd7Hr39%3DEit0yDA%40mail.gmail.com?utm_medium=email&utm_source=footer>
>>> .
>>>
>>> For more options, visit https://groups.google.com/d/optout.
>>>
>>
>>
>
>
> --
> Emmett Butler | Senior Software Engineer
>
> <http://www.parsely.com/?utm_source=Signature&utm_medium=emmett-butler&utm_campaign=Signature>
>