You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Ryan Williams <rw...@gmail.com> on 2014/06/13 18:46:20 UTC

Recovering from consumer failure

I have a consumer program that encountered an error when handling a message
and crashed. It does manual commits, and since it failed, it keeps failing
now because it gets the same bad message. I ran the ExportZkOffsets and
ImportZkOffsets tools successfully to advance the offset before when this
happened, but it is not working this time (the export generates an empty
file).

Going through tools to debug shows the following, what else can I look at?

λ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic
Topic:mytopic      PartitionCount:2        ReplicationFactor:1     Configs:
        Topic: mytopic     Partition: 0    Leader: 0       Replicas: 0
Isr: 0
        Topic: mytopic     Partition: 1    Leader: 0       Replicas: 0
Isr: 0

λ bin/kafka-topics.sh --describe --zookeeper localhost:2181
--unavailable-partitions
[no results]

λ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
localhost:9092 --topic mytopic --time -1
mytopic:0:0
mytopic:1:11

λ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic mytopic
--zkconnect localhost:2181 --group mytopic_group
Group           Topic                          Pid Offset
logSize         Lag             Owner
Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException:
org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /consumers/mytopic_group/offsets/mytopic/0
        at
org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
        at
org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
        at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
        at kafka.utils.ZkUtils$.readData(ZkUtils.scala:461)
....

λ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
--from-beginning
badmessage
{"goodmessage":"details"}

Re: Recovering from consumer failure

Posted by Guozhang Wang <wa...@gmail.com>.
For some reason, your committed offsets are no longer in the zknode:

org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
NoNode for /consumers/mytopic_group/offsets/mytopic/0

In this case, you may have to read from the head of the log with
duplicates, or read from the tail of the log and hence probably missing
some messages.

Guozhang


On Fri, Jun 13, 2014 at 9:46 AM, Ryan Williams <rw...@gmail.com> wrote:

> I have a consumer program that encountered an error when handling a message
> and crashed. It does manual commits, and since it failed, it keeps failing
> now because it gets the same bad message. I ran the ExportZkOffsets and
> ImportZkOffsets tools successfully to advance the offset before when this
> happened, but it is not working this time (the export generates an empty
> file).
>
> Going through tools to debug shows the following, what else can I look at?
>
> λ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic
> Topic:mytopic      PartitionCount:2        ReplicationFactor:1     Configs:
>         Topic: mytopic     Partition: 0    Leader: 0       Replicas: 0
> Isr: 0
>         Topic: mytopic     Partition: 1    Leader: 0       Replicas: 0
> Isr: 0
>
> λ bin/kafka-topics.sh --describe --zookeeper localhost:2181
> --unavailable-partitions
> [no results]
>
> λ bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list
> localhost:9092 --topic mytopic --time -1
> mytopic:0:0
> mytopic:1:11
>
> λ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --topic mytopic
> --zkconnect localhost:2181 --group mytopic_group
> Group           Topic                          Pid Offset
> logSize         Lag             Owner
> Exception in thread "main" org.I0Itec.zkclient.exception.ZkNoNodeException:
> org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode =
> NoNode for /consumers/mytopic_group/offsets/mytopic/0
>         at
> org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:47)
>         at
> org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:685)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:766)
>         at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761)
>         at kafka.utils.ZkUtils$.readData(ZkUtils.scala:461)
> ....
>
> λ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic
> --from-beginning
> badmessage
> {"goodmessage":"details"}
>



-- 
-- Guozhang