You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by siddharth ubale <si...@gmail.com> on 2014/09/04 09:04:42 UTC

Failed to find leader

Hi,

I am using a kafka spout to read data into Storm 0.9.2.
whenever i run my program and check the console consumer, i can see that
there is an aerror saying
*" Failed to add leader for partitions [Json1234,0]; will retry
(kafka.consumer.ConsumerFetcherManager$LeaderFinderThread"*
along with a log :
kafka.common.NotLeaderForPartitionException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.lang.Class.newInstance(Class.java:374)
 at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
at
kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
 at
kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
at
kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
 at
kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
at
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
 at
kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
at
kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
 at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
^C[2014-09-04 12:27:45,186] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-09-04 12:27:45,223] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)
[2014-09-04 12:27:45,237] WARN Reconnect due to socket error: null
(kafka.consumer.SimpleConsumer)

has any one encountered this issue.
I am using *kafka_2.10-0.8.1.1 and storm 0.9.2 incubating with zookeeper
3.4.6.*

*do let me know. also i see this very often whenever i have restarted my
topic .*

Thanks,
Siddharth ubale

Re: Failed to find leader

Posted by Neha Narkhede <ne...@gmail.com>.
If Storm 0.9.2. uses the SimpleConsumer, this could be a bug in the leader
fault tolerance logic in Storm itself. The high level consumer
(ZookeeperConsumerConnector) handles this automatically for you.

On Thu, Sep 4, 2014 at 8:31 AM, Guozhang Wang <wa...@gmail.com> wrote:

> Hi Siddharth,
>
> The error log shows that the kafka broker the consumer talks to is no
> longer hosting the leader replica of the topic-partition, is there any
> broker failures during that time (you may find it in server and controller
> logs)?
>
> Guozhang
>
>
>
> On Thu, Sep 4, 2014 at 12:04 AM, siddharth ubale <
> siddharth.ubale@gmail.com>
> wrote:
>
> > Hi,
> >
> > I am using a kafka spout to read data into Storm 0.9.2.
> > whenever i run my program and check the console consumer, i can see that
> > there is an aerror saying
> > *" Failed to add leader for partitions [Json1234,0]; will retry
> > (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread"*
> > along with a log :
> > kafka.common.NotLeaderForPartitionException
> > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> >  at
> >
> >
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> > at
> >
> >
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> >  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> > at java.lang.Class.newInstance(Class.java:374)
> >  at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
> > at
> >
> >
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
> >  at
> >
> >
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
> > at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >  at
> >
> >
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
> > at
> >
> >
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
> >  at
> >
> >
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> > at
> >
> >
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >  at
> >
> >
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
> > at
> >
> >
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
> >  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> > ^C[2014-09-04 12:27:45,186] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-09-04 12:27:45,223] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> > [2014-09-04 12:27:45,237] WARN Reconnect due to socket error: null
> > (kafka.consumer.SimpleConsumer)
> >
> > has any one encountered this issue.
> > I am using *kafka_2.10-0.8.1.1 and storm 0.9.2 incubating with zookeeper
> > 3.4.6.*
> >
> > *do let me know. also i see this very often whenever i have restarted my
> > topic .*
> >
> > Thanks,
> > Siddharth ubale
> >
>
>
>
> --
> -- Guozhang
>

Re: Failed to find leader

Posted by Guozhang Wang <wa...@gmail.com>.
Hi Siddharth,

The error log shows that the kafka broker the consumer talks to is no
longer hosting the leader replica of the topic-partition, is there any
broker failures during that time (you may find it in server and controller
logs)?

Guozhang



On Thu, Sep 4, 2014 at 12:04 AM, siddharth ubale <si...@gmail.com>
wrote:

> Hi,
>
> I am using a kafka spout to read data into Storm 0.9.2.
> whenever i run my program and check the console consumer, i can see that
> there is an aerror saying
> *" Failed to add leader for partitions [Json1234,0]; will retry
> (kafka.consumer.ConsumerFetcherManager$LeaderFinderThread"*
> along with a log :
> kafka.common.NotLeaderForPartitionException
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at
>
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:374)
>  at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:73)
> at
>
> kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:160)
>  at
>
> kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:60)
> at
>
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:179)
>  at
>
> kafka.server.AbstractFetcherThread$$anonfun$addPartitions$2.apply(AbstractFetcherThread.scala:174)
> at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  at
>
> kafka.server.AbstractFetcherThread.addPartitions(AbstractFetcherThread.scala:174)
> at
>
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:86)
>  at
>
> kafka.server.AbstractFetcherManager$$anonfun$addFetcherForPartitions$2.apply(AbstractFetcherManager.scala:76)
> at
>
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>  at scala.collection.immutable.Map$Map1.foreach(Map.scala:109)
> at
>
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>  at
>
> kafka.server.AbstractFetcherManager.addFetcherForPartitions(AbstractFetcherManager.scala:76)
> at
>
> kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:95)
>  at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)
> ^C[2014-09-04 12:27:45,186] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-09-04 12:27:45,223] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
> [2014-09-04 12:27:45,237] WARN Reconnect due to socket error: null
> (kafka.consumer.SimpleConsumer)
>
> has any one encountered this issue.
> I am using *kafka_2.10-0.8.1.1 and storm 0.9.2 incubating with zookeeper
> 3.4.6.*
>
> *do let me know. also i see this very often whenever i have restarted my
> topic .*
>
> Thanks,
> Siddharth ubale
>



-- 
-- Guozhang