You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Phil Luckhurst <ph...@encycle.com> on 2017/10/10 12:20:09 UTC

Incorrect consumer offsets after broker restart 0.11.0.0

We have a Kafka broker we use for testing that we have recently updated from 0.9.0.1 to 0.11.0.0 and our java consumer is built using the 0.11.0.0 client. The consumers manually commit offsets and are consuming messages as expected since the upgrade. If we restart the consumers they fetch the previously committed offset from the broker and restart processing new messages as expected. Kafka Manager reports the offsets we expect to see. However, if we restart the broker the consumer receives an old offset from the broker and we can end up re-processing several days' worth of messages.

We have identified the __consumers_offset partition where the offsets are being stored and if we use the console consumer to consume from that partition we see a new message appear each time our consumer commits its offsets. The commands we use are:

echo "exclude.internal.topics=false" > /tmp/consumer.config
/opt/kafka/bin/kafka-console-consumer.sh --consumer.config /tmp/consumer.config --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server localhost:9092 --topic __consumer_offsets --consumer-property group.id=test-offsets-consumer-group --partition 43

And the output shows our consumer group and topic partition for each commit the consumer sends, the reported offset is correct.

[ta-eng-cob1-tstat-events,ta-eng-cob1-ayla,0]::[OffsetMetadata[1833602,NO_METADATA],CommitTime 1507632714328,ExpirationTime 1507719114328]

We also used the following command to check that these commits also trigger a new record to be written to the latest __consumer_offset_43 partition log file and we see a new record added to the partition log file every time the consumer commits offsets.

/opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --print-data-log --files /var/lib/kafka/__consumer_offsets-43/00000000000003780553.log

baseOffset: 4006387 lastOffset: 4006387 baseSequence: 0 lastSequence: 0 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 16 isTransactional: false position: 30359857 CreateTime: 1507632866696 isvalid: true size: 147 magic: 2 compresscodec: NONE crc: 1175188994

Everything appears to be working as expected until we restart the broker which then returns an old offset to the consumer.

For example, in the consumer debug output we see the last commit before the broker restart is 1828033

2017-10-09 11:55:16,056 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events committed offset 1828033 for partition ta-eng-cob1-ayla-0

After we restart the broker we see the consumer receive an old offset of 1791273 from the broker.

2017-10-09 11:57:22,735 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset 1791273

If we just restart the consumer the fetch returns the correct offset information from the broker.

2017-10-09 11:52:25,984 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events committed offset 1828015 for partition ta-eng-cob1-ayla-0
2017-10-09 11:53:21,658 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher: Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset 1828015

There don't appear to be any errors in the broker logs to indicate a problem, so the question is what is making the broker return the incorrect offset when it is restarted?

Thanks,
Phil Luckhurst


Re: Incorrect consumer offsets after broker restart 0.11.0.0

Posted by Elyahou Ittah <el...@fiverr.com>.
Yes, this is the Jira ticket about this issue:

https://issues.apache.org/jira/browse/KAFKA-5600

On Wed, Oct 11, 2017 at 5:47 PM, Vincent Dautremont <
vincent.dautremont@olamobile.com.invalid> wrote:

> I would also like to know the related Jira ticket if any, to check that
> what I experience the same phenomenon.
> I see this happening even without restarting the kafka broker process :
>
> I sometime have a Zookeeper socket that fails, the Kafka broker then step
> down from its leader duties for a few seconds before entering the cluster,
> Consumers gets a response from the broker that it is not anymore part of
> the cluster.
>
> So my client resets, gets reassigned partitions and gets some old CG
> offsets from day ago (often from log data that was already deleted).
>
> Thanks.
>
>
> On Wed, Oct 11, 2017 at 1:59 PM, Phil Luckhurst <
> phil.luckhurst@encycle.com>
> wrote:
>
> > Upgrading the broker to version 0.11.0.1 has fixed the problem.
> >
> > Thanks.
> >
>
> --
> The information transmitted is intended only for the person or entity to
> which it is addressed and may contain confidential and/or privileged
> material. Any review, retransmission, dissemination or other use of, or
> taking of any action in reliance upon, this information by persons or
> entities other than the intended recipient is prohibited. If you received
> this in error, please contact the sender and delete the material from any
> computer.
>

Re: Incorrect consumer offsets after broker restart 0.11.0.0

Posted by Vincent Dautremont <vi...@olamobile.com.INVALID>.
I would also like to know the related Jira ticket if any, to check that
what I experience the same phenomenon.
I see this happening even without restarting the kafka broker process :

I sometime have a Zookeeper socket that fails, the Kafka broker then step
down from its leader duties for a few seconds before entering the cluster,
Consumers gets a response from the broker that it is not anymore part of
the cluster.

So my client resets, gets reassigned partitions and gets some old CG
offsets from day ago (often from log data that was already deleted).

Thanks.


On Wed, Oct 11, 2017 at 1:59 PM, Phil Luckhurst <ph...@encycle.com>
wrote:

> Upgrading the broker to version 0.11.0.1 has fixed the problem.
>
> Thanks.
>

-- 
The information transmitted is intended only for the person or entity to 
which it is addressed and may contain confidential and/or privileged 
material. Any review, retransmission, dissemination or other use of, or 
taking of any action in reliance upon, this information by persons or 
entities other than the intended recipient is prohibited. If you received 
this in error, please contact the sender and delete the material from any 
computer.

RE: Incorrect consumer offsets after broker restart 0.11.0.0

Posted by Phil Luckhurst <ph...@encycle.com>.
Upgrading the broker to version 0.11.0.1 has fixed the problem.

Thanks.

RE: Incorrect consumer offsets after broker restart 0.11.0.0

Posted by Phil Luckhurst <ph...@encycle.com>.
Thanks, we'll try upgrading to 0.11.0.1 and see if it fixes the problem.

Is this the bug you are referring to?
https://issues.apache.org/jira/browse/KAFKA-5600

-----Original Message-----
From: Elyahou Ittah [mailto:elyahou.i@fiverr.com] 
Sent: 10 October 2017 13:41
To: users@kafka.apache.org
Subject: Re: Incorrect consumer offsets after broker restart 0.11.0.0

It is a known bug, fixed in 0.11.0.1

On Oct 10, 2017 15:20, "Phil Luckhurst" <ph...@encycle.com> wrote:

> We have a Kafka broker we use for testing that we have recently 
> updated from 0.9.0.1 to 0.11.0.0 and our java consumer is built using 
> the 0.11.0.0 client. The consumers manually commit offsets and are 
> consuming messages as expected since the upgrade. If we restart the 
> consumers they fetch the previously committed offset from the broker 
> and restart processing new messages as expected. Kafka Manager reports the offsets we expect to see.
> However, if we restart the broker the consumer receives an old offset 
> from the broker and we can end up re-processing several days' worth of messages.
>
> We have identified the __consumers_offset partition where the offsets 
> are being stored and if we use the console consumer to consume from 
> that partition we see a new message appear each time our consumer 
> commits its offsets. The commands we use are:
>
> echo "exclude.internal.topics=false" > /tmp/consumer.config 
> /opt/kafka/bin/kafka-console-consumer.sh --consumer.config 
> /tmp/consumer.config --formatter "kafka.coordinator.group.
> GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server
> localhost:9092 --topic __consumer_offsets --consumer-property group.id 
> =test-offsets-consumer-group --partition 43
>
> And the output shows our consumer group and topic partition for each 
> commit the consumer sends, the reported offset is correct.
>
> [ta-eng-cob1-tstat-events,ta-eng-cob1-ayla,0]::[OffsetMetadata[1833602
> ,NO_METADATA],CommitTime 1507632714328,ExpirationTime 1507719114328]
>
> We also used the following command to check that these commits also 
> trigger a new record to be written to the latest __consumer_offset_43 
> partition log file and we see a new record added to the partition log 
> file every time the consumer commits offsets.
>
> /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments 
> --print-data-log --files /var/lib/kafka/__consumer_offsets-43/
> 00000000000003780553.log
>
> baseOffset: 4006387 lastOffset: 4006387 baseSequence: 0 lastSequence: 
> 0
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 16 isTransactional:
> false position: 30359857 CreateTime: 1507632866696 isvalid: true size: 
> 147
> magic: 2 compresscodec: NONE crc: 1175188994
>
> Everything appears to be working as expected until we restart the 
> broker which then returns an old offset to the consumer.
>
> For example, in the consumer debug output we see the last commit 
> before the broker restart is 1828033
>
> 2017-10-09 11:55:16,056 DEBUG [pool-7-thread-2] org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events 
> committed offset 1828033 for partition ta-eng-cob1-ayla-0
>
> After we restart the broker we see the consumer receive an old offset 
> of
> 1791273 from the broker.
>
> 2017-10-09 11:57:22,735 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher:
> Resetting offset for partition ta-eng-cob1-ayla-0 to the committed 
> offset
> 1791273
>
> If we just restart the consumer the fetch returns the correct offset 
> information from the broker.
>
> 2017-10-09 11:52:25,984 DEBUG [pool-7-thread-2] org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events 
> committed offset 1828015 for partition ta-eng-cob1-ayla-0
> 2017-10-09 11:53:21,658 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher:
> Resetting offset for partition ta-eng-cob1-ayla-0 to the committed 
> offset
> 1828015
>
> There don't appear to be any errors in the broker logs to indicate a 
> problem, so the question is what is making the broker return the 
> incorrect offset when it is restarted?
>
> Thanks,
> Phil Luckhurst
>
>

Re: Incorrect consumer offsets after broker restart 0.11.0.0

Posted by Elyahou Ittah <el...@fiverr.com>.
It is a known bug, fixed in 0.11.0.1

On Oct 10, 2017 15:20, "Phil Luckhurst" <ph...@encycle.com> wrote:

> We have a Kafka broker we use for testing that we have recently updated
> from 0.9.0.1 to 0.11.0.0 and our java consumer is built using the 0.11.0.0
> client. The consumers manually commit offsets and are consuming messages as
> expected since the upgrade. If we restart the consumers they fetch the
> previously committed offset from the broker and restart processing new
> messages as expected. Kafka Manager reports the offsets we expect to see.
> However, if we restart the broker the consumer receives an old offset from
> the broker and we can end up re-processing several days' worth of messages.
>
> We have identified the __consumers_offset partition where the offsets are
> being stored and if we use the console consumer to consume from that
> partition we see a new message appear each time our consumer commits its
> offsets. The commands we use are:
>
> echo "exclude.internal.topics=false" > /tmp/consumer.config
> /opt/kafka/bin/kafka-console-consumer.sh --consumer.config
> /tmp/consumer.config --formatter "kafka.coordinator.group.
> GroupMetadataManager\$OffsetsMessageFormatter" --bootstrap-server
> localhost:9092 --topic __consumer_offsets --consumer-property group.id
> =test-offsets-consumer-group --partition 43
>
> And the output shows our consumer group and topic partition for each
> commit the consumer sends, the reported offset is correct.
>
> [ta-eng-cob1-tstat-events,ta-eng-cob1-ayla,0]::[OffsetMetadata[1833602,NO_METADATA],CommitTime
> 1507632714328,ExpirationTime 1507719114328]
>
> We also used the following command to check that these commits also
> trigger a new record to be written to the latest __consumer_offset_43
> partition log file and we see a new record added to the partition log file
> every time the consumer commits offsets.
>
> /opt/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments
> --print-data-log --files /var/lib/kafka/__consumer_offsets-43/
> 00000000000003780553.log
>
> baseOffset: 4006387 lastOffset: 4006387 baseSequence: 0 lastSequence: 0
> producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 16 isTransactional:
> false position: 30359857 CreateTime: 1507632866696 isvalid: true size: 147
> magic: 2 compresscodec: NONE crc: 1175188994
>
> Everything appears to be working as expected until we restart the broker
> which then returns an old offset to the consumer.
>
> For example, in the consumer debug output we see the last commit before
> the broker restart is 1828033
>
> 2017-10-09 11:55:16,056 DEBUG [pool-7-thread-2] org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events
> committed offset 1828033 for partition ta-eng-cob1-ayla-0
>
> After we restart the broker we see the consumer receive an old offset of
> 1791273 from the broker.
>
> 2017-10-09 11:57:22,735 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher:
> Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset
> 1791273
>
> If we just restart the consumer the fetch returns the correct offset
> information from the broker.
>
> 2017-10-09 11:52:25,984 DEBUG [pool-7-thread-2] org.apache.kafka.clients.
> consumer.internals.ConsumerCoordinator: Group ta-eng-cob1-tstat-events
> committed offset 1828015 for partition ta-eng-cob1-ayla-0
> 2017-10-09 11:53:21,658 DEBUG [pool-7-thread-2] org.apache.kafka.clients.consumer.internals.Fetcher:
> Resetting offset for partition ta-eng-cob1-ayla-0 to the committed offset
> 1828015
>
> There don't appear to be any errors in the broker logs to indicate a
> problem, so the question is what is making the broker return the incorrect
> offset when it is restarted?
>
> Thanks,
> Phil Luckhurst
>
>