You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Timur Fayruzov <ti...@gmail.com> on 2016/10/19 23:36:57 UTC

Offset sporadically being reset

Hello,

I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe
strange behavior on select topics. The cluster runs in-house as well as
most consumers. I have started some consumers in AWS and they _mostly_ work
fine. Occasionally, I end up in a state where when I run
kafka-consumer-offset-checker I see that offset of one partition goes back
and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.)

Kafka broker that is holding this partition has following log messages:
{"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka","thread":"kafka-request-handler-2","logger":"kafka.server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica
Manager on Broker 0]: Error when processing fetch request for partition
[my_topic,1] offset 337055698 from consumer with correlation id 0. Possible
cause: Request for offset 337055698 but we only have log segments in the
range 347392118 to 361407455.","@version":"1","@severity":"ERROR"}

{"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":"java.io.IOException:
Broken pipe
at
sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434)
at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566)
at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
at kafka.network.MultiSend.writeTo(Transmission.scala:101)
at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
at kafka.network.Processor.write(SocketServer.scala:472)
at kafka.network.Processor.run(SocketServer.scala:342)
at
java.lang.Thread.run(Thread.java:745)\n","thread":"kafka-network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0.util.pages","@category":"common","@msg":"Closing
socket for /10.10.10.10 because of
error","@version":"1","@severity":"ERROR"}

IP above is obscured, but it is an IP of the EC2 node that runs the
consumer for that partition.

I try to reset offset for the consumer group at that partition manually (I
wrote a script for that), but I still see it being reset to a prior point
(and back). It seems that after a while this behavior goes away and
affected partitions have a chance to catch up, but then the whole thing
repeats.

My consumer configuration is:

"socket.timeout.ms": "60000",
"zookeeper.session.timeout.ms": "60000",
"offsets.channel.socket.timeout.ms": "30000"
"auto.offset.reset": "smallest"
"offsets.storage": "kafka"
"consumer.timeout.ms": "1500"

I use reactive-kafka wrapper, other places where it is used do not have
these problems.

Please, advice what could this be.

Thanks,
Timur

Re: Offset sporadically being reset

Posted by Timur Fayruzov <ti...@gmail.com>.
Additionally, on consumer I observe a strange behavior: it is being
constantly rebalancing. There are no errors and each rebalance succeeds,
but as soon as one is finished the next one is started.

On Wed, Oct 19, 2016 at 4:36 PM, Timur Fayruzov <ti...@gmail.com>
wrote:

> Hello,
>
> I run Kafka 0.8.2.2 cluster with 3 nodes and recently started to observe
> strange behavior on select topics. The cluster runs in-house as well as
> most consumers. I have started some consumers in AWS and they _mostly_ work
> fine. Occasionally, I end up in a state where when I run
> kafka-consumer-offset-checker I see that offset of one partition goes back
> and forth (i.e. it was 1000 then goes to 900 then goes to 1100 etc.)
>
> Kafka broker that is holding this partition has following log messages:
> {"@timestamp":"2016-10-19T21:00:00.134Z","@service":"kafka"
> ,"thread":"kafka-request-handler-2","logger":"kafka.
> server.ReplicaManager","@host":"kafka-0","@category":"common","@msg":"[Replica
> Manager on Broker 0]: Error when processing fetch request for partition
> [my_topic,1] offset 337055698 from consumer with correlation id 0. Possible
> cause: Request for offset 337055698 but we only have log segments in the
> range 347392118 to 361407455.","@version":"1","@severity":"ERROR"}
>
> {"@timestamp":"2016-10-19T21:00:00.168Z","@service":"kafka","exception":"
> java.io.IOException: Broken pipe
> at
> sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:434)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:566)
> at kafka.log.FileMessageSet.writeTo(FileMessageSet.scala:147)
> at kafka.api.PartitionDataSend.writeTo(FetchResponse.scala:70)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.TopicDataSend.writeTo(FetchResponse.scala:125)
> at kafka.network.MultiSend.writeTo(Transmission.scala:101)
> at kafka.api.FetchResponseSend.writeTo(FetchResponse.scala:231)
> at kafka.network.Processor.write(SocketServer.scala:472)
> at kafka.network.Processor.run(SocketServer.scala:342)
> at java.lang.Thread.run(Thread.java:745)\n","thread":"kafka-
> network-thread-6667-0","logger":"kafka.network.Processor","@host":"kafka0.
> util.pages","@category":"common","@msg":"Closing socket for /10.10.10.10
> because of error","@version":"1","@severity":"ERROR"}
>
> IP above is obscured, but it is an IP of the EC2 node that runs the
> consumer for that partition.
>
> I try to reset offset for the consumer group at that partition manually (I
> wrote a script for that), but I still see it being reset to a prior point
> (and back). It seems that after a while this behavior goes away and
> affected partitions have a chance to catch up, but then the whole thing
> repeats.
>
> My consumer configuration is:
>
> "socket.timeout.ms": "60000",
> "zookeeper.session.timeout.ms": "60000",
> "offsets.channel.socket.timeout.ms": "30000"
> "auto.offset.reset": "smallest"
> "offsets.storage": "kafka"
> "consumer.timeout.ms": "1500"
>
> I use reactive-kafka wrapper, other places where it is used do not have
> these problems.
>
> Please, advice what could this be.
>
> Thanks,
> Timur
>