You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Bhavesh Mistry <mi...@gmail.com> on 2014/11/01 06:07:35 UTC

Re: High Level Consumer Iterator IllegalStateException Issue

Hi Jun,

Here is code base:
https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java

Please let me know if you can help me determine  the root cause.   Why
there is illegal state and blocking ?

Thanks,

Bhavesh

On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao <ju...@gmail.com> wrote:

> Do you have a simple test that can reproduce this issue?
>
> Thanks,
>
> Jun
>
> On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com>
> wrote:
>
> > HI Jun,
> >
> > Consumer Connector is not closed because I can see the ConsumerFetcher
> > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > This is what I see after recovery.
> >
> >
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Another possibility is that the consumer connector is already closed
> and
> > > then you call hasNext() on the iterator.
> > >
> > > Thanks,
> > >
> > >
> > > Jun
> > >
> > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > Hi Jun,
> > > >
> > > > The hasNext() itself throws this error.  I have to manually reset
> state
> > > and
> > > > sometime it is able to recover and other it is not. Any other clue ?
> > > >
> > > >         public boolean hasNext() {
> > > >             LOG.info("called of  hasNext() :");
> > > >             int retry = 3;
> > > >             while(retry > 0){
> > > >                 try{
> > > >                     // this hasNext is blocking call..
> > > >                     boolean result = iterator.hasNext();
> > > >                     return result;
> > > >                 }catch(IllegalStateException exp){
> > > >                     iterator.resetState();
> > > >                     LOG.error("GOT IllegalStateException arg trying
> to
> > > > recover....", exp);
> > > >                     retry--;
> > > >                 }
> > > >             }
> > > >             return false;
> > > >         }
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > The IllegalStateException typically happens if you call next()
> before
> > > > > hasNext() on the iterator.
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jun
> > > > >
> > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com
> > > > > > wrote:
> > > > >
> > > > > > Hi Neha,
> > > > > >
> > > > > > Thanks for your answer.  Can you please let me know how I can
> > resolve
> > > > the
> > > > > > Iterator IllegalStateException ?  I would appreciate your is this
> > is
> > > > bug
> > > > > I
> > > > > > can file one or let me know if this is use case specific ?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > neha.narkhede@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > queued.max.message.chunks controls the consumer's fetcher
> queue.
> > > > > > >
> > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > HI Neha,
> > > > > > > >
> > > > > > > > If I solved the problem number 1 think and number 2 will be
> > > solved
> > > > > > (prob
> > > > > > > > 1 is causing problem number 2(blocked)).  Can you please let
> me
> > > > know
> > > > > > what
> > > > > > > > controls the queue size for *ConsumerFetcherThread* thread ?
> > > > > > > >
> > > > > > > >
> > > > > > > > Please see the attached java source code which will reproduce
> > the
> > > > > > > > problem.  You may remove the recovery process...  Please
> check.
> > > We
> > > > > > have
> > > > > > > to
> > > > > > > > do some work before we start reading from Kafka Stream
> > Interator
> > > > and
> > > > > > this
> > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > IllegalStateException: Iterator is in failed state*.
> > > > > > > >
> > > > > > > > Please let me know your finding and recommendation.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Bhavesh
> > > > > > > >
> > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > > > > > neha.narkhede@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > >> >> Sometime it give following exception.
> > > > > > > >>
> > > > > > > >> It will help to have a more specific test case that
> reproduces
> > > the
> > > > > > > failed
> > > > > > > >> iterator state.
> > > > > > > >>
> > > > > > > >> Also, the consumer threads block if the fetcher queue is
> full.
> > > The
> > > > > > queue
> > > > > > > >> can fill up if your consumer thread dies or slows down. I'd
> > > > > recommend
> > > > > > > you
> > > > > > > >> ensure that all your consumer threads are alive. You can
> take
> > a
> > > > > thread
> > > > > > > >> dump
> > > > > > > >> to verify this.
> > > > > > > >>
> > > > > > > >> Thanks,
> > > > > > > >> Neha
> > > > > > > >>
> > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> > > > > > > >> mistry.p.bhavesh@gmail.com>
> > > > > > > >> wrote:
> > > > > > > >>
> > > > > > > >> > Hi Neha,
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > I have two problems:.  Any help is greatly appreciated.
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > 1)* java.lang.IllegalStateException: Iterator is in failed
> > > > state*
> > > > > > > >> >
> > > > > > > >> >        ConsumerConnector  consumerConnector = Consumer
> > > > > > > >> >
> > > >  .createJavaConsumerConnector(getConsumerConfig());
> > > > > > > >> >         Map<String, Integer> topicCountMap = new
> > > HashMap<String,
> > > > > > > >> > Integer>();
> > > > > > > >> >         topicCountMap.put(topic, *32*);
> > > > > > > >> >         Map<String, List<KafkaStream<byte[], byte[]>>>
> > > > > > topicStreamMap
> > > > > > > =
> > > > > > > >> > consumerConnector
> > > > > > > >> >                 .createMessageStreams(topicCountMap);
> > > > > > > >> >
> > > > > > > >> >         List<KafkaStream<byte[], byte[]>> streams =
> > > > > > > >> > Collections.synchronizedList(topicStreamMap.get(topic));
> > > > > > > >> >
> > > > > > > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > > > > > > >> >
> > > > > > > >> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
> > > > > > > >> > streams.iterator();
> > > > > > > >> >         // remove the head first list for this
> source...rest
> > > are
> > > > > for
> > > > > > > the
> > > > > > > >> > Dynamic Souce...
> > > > > > > >> >         mainIterator = iterator.next().iterator();
> > > > > > > >> >
> > > > > > > >> >         List<ConsumerIterator<byte[], byte[]>>
> iteratorList
> > =
> > > > new
> > > > > > > >> >
> ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > > > > > > >> >         // now rest of the iterator must be registered
> now..
> > > > > > > >> >         while(iterator.hasNext()){
> > > > > > > >> >             iteratorList.add(iterator.next().iterator());
> > > > > > > >> >         }
> > > > > > > >> >
>  *KafkaStreamRegistory.registerStream(mainSourceName,
> > > > > > > >> > iteratorList);*
> > > > > > > >> >
> > > > > > > >> > Once the Consumer iterator is created and registered.  We
> > use
> > > > this
> > > > > > in
> > > > > > > >> > another thread to start reading from the Consumer
> Iterator.
> > > > > >  Sometime
> > > > > > > >> it
> > > > > > > >> > give following exception.
> > > > > > > >> >
> > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR
> > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > > > > > > >> > (grizzled.slf4j.Logger.error:116)  - SourceThread:
> exception
> > > > > during
> > > > > > > >> reads.
> > > > > > > >> > Swallowed to continue next read.
> > > > > > > >> > java.lang.IllegalStateException: Iterator is in failed
> state
> > > > > > > >> >     at
> > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > I have tried to recover from this state by using this:
> > > > > > > >> > iterator.resetState(); but it does not recover sometime.
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > *2) ConsumerFetcherThread are blocked on enqueue ?  What
> > > > controls
> > > > > > size
> > > > > > > >> of
> > > > > > > >> > queue ? Why are they blocked ?  *Due to this our lags are
> > > > > > increasing.
> > > > > > > >> our
> > > > > > > >> > threads blocked on hasNext()...
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on
> > condition
> > > > > > > >> > [0x0000000116379000]
> > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > >> >         - parking to wait for  <0x0000000704019388> (a
> > > > > > > >> >
> > > > > >
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > >
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > >
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > >> >         at
> > > > > > > >>
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on
> > condition
> > > > > > > >> > [0x0000000116276000]
> > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > >> >         - parking to wait for  <0x0000000704064ce0> (a
> > > > > > > >> >
> > > > > >
> > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > >
> > > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > >
> > > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > >> >         at
> > > > > > > >> >
> > > > > > > >>
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > >> >         at
> > > > > > > >>
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > Thanks,
> > > > > > > >> >
> > > > > > > >> > Bhavesh
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> >
> > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <
> > > > > > > neha.narkhede@gmail.com
> > > > > > > >> >
> > > > > > > >> > wrote:
> > > > > > > >> >
> > > > > > > >> > > Can you provide the steps to reproduce this issue?
> > > > > > > >> > >
> > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> > > > > > > >> > > mistry.p.bhavesh@gmail.com>
> > > > > > > >> > > wrote:
> > > > > > > >> > >
> > > > > > > >> > > > I am using one from the Kafka Trunk branch.
> > > > > > > >> > > >
> > > > > > > >> > > > Thanks,
> > > > > > > >> > > >
> > > > > > > >> > > > Bhavesh
> > > > > > > >> > > >
> > > > > > > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> > > > > > > >> > neha.narkhede@gmail.com>
> > > > > > > >> > > > wrote:
> > > > > > > >> > > >
> > > > > > > >> > > > > Which version of Kafka are you using on the
> consumer?
> > > > > > > >> > > > >
> > > > > > > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > > > > > >> > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > >> > > > > wrote:
> > > > > > > >> > > > >
> > > > > > > >> > > > > > HI Kafka Community ,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > I am using kafka trunk source code and I get
> > following
> > > > > > > >> exception.
> > > > > > > >> > > What
> > > > > > > >> > > > > > could cause the iterator to have FAILED state.
> > Please
> > > > let
> > > > > > me
> > > > > > > >> know
> > > > > > > >> > > how
> > > > > > > >> > > > I
> > > > > > > >> > > > > > can fix this issue.
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > *java.lang.IllegalStateException: Iterator is in
> > > failed
> > > > > > state
> > > > > > > >>   at
> > > > > > > >> > > > > >
> > > > > > >
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > > >> > > > > > Here is Properties:
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >         Properties props = new Properties();
> > > > > > > >> > > > > >         props.put("zookeeper.connect", zkConnect);
> > > > > > > >> > > > > >         props.put("group.id", groupId);
> > > > > > > >> > > > > > *        props.put("consumer.timeout.ms <
> > > > > > > >> > http://consumer.timeout.ms
> > > > > > > >> > > >",
> > > > > > > >> > > > > > "-1");*
> > > > > > > >> > > > > >         props.put("zookeeper.session.timeout.ms",
> > > > > "10000");
> > > > > > > >> > > > > >         props.put("zookeeper.sync.time.ms",
> > "6000");
> > > > > > > >> > > > > >         props.put("auto.commit.interval.ms",
> > "2000");
> > > > > > > >> > > > > >         props.put("rebalance.max.retries", "8");
> > > > > > > >> > > > > >         props.put("auto.offset.reset", "largest");
> > > > > > > >> > > > > >
> > >  props.put("fetch.message.max.bytes","2097152");
> > > > > > > >> > > > > >
> > > > >  props.put("socket.receive.buffer.bytes","2097152");
> > > > > > > >> > > > > >         props.put("auto.commit.enable","true");
> > > > > > > >> > > > > >
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Thanks,
> > > > > > > >> > > > > >
> > > > > > > >> > > > > > Bhavesh
> > > > > > > >> > > > > >
> > > > > > > >> > > > >
> > > > > > > >> > > >
> > > > > > > >> > >
> > > > > > > >> >
> > > > > > > >>
> > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: High Level Consumer Iterator IllegalStateException Issue

Posted by Bhavesh Mistry <mi...@gmail.com>.
Hi Neha and Jun,

I have fixed the issue on my side based on what Jun had mentioned "next()
gives IllegalStateException if hasNext is not called..."  Based on this I
did further debug, I was my mistake sharing same consumer iterator across
multiple threads so (I forgot to call iterator.remove() in my registry so
it was using first Consumer iterator across all threads).  So due to
multiple threads sharing same (first one) consumer iterator therefore it
was intermittently getting this exception.

This issue is resolve thank you very much for your support.

Thanks,

Bhavesh

On Mon, Nov 3, 2014 at 4:35 PM, Jun Rao <ju...@gmail.com> wrote:

> Bhavesh,
>
> That example has a lot of code. Could you provide a simpler test that
> demonstrates the problem?
>
> Thanks,
>
> Jun
>
> On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry <
> mistry.p.bhavesh@gmail.com
> > wrote:
>
> > Hi Jun,
> >
> > Here is code base:
> >
> >
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
> >
> > Please let me know if you can help me determine  the root cause.   Why
> > there is illegal state and blocking ?
> >
> > Thanks,
> >
> > Bhavesh
> >
> > On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Do you have a simple test that can reproduce this issue?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > > mistry.p.bhavesh@gmail.com>
> > > wrote:
> > >
> > > > HI Jun,
> > > >
> > > > Consumer Connector is not closed because I can see the
> ConsumerFetcher
> > > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > > This is what I see after recovery.
> > > >
> > > >
> > > >
> > > > Thanks,
> > > >
> > > > Bhavesh
> > > >
> > > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <ju...@gmail.com> wrote:
> > > >
> > > > > Another possibility is that the consumer connector is already
> closed
> > > and
> > > > > then you call hasNext() on the iterator.
> > > > >
> > > > > Thanks,
> > > > >
> > > > >
> > > > > Jun
> > > > >
> > > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > > mistry.p.bhavesh@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Jun,
> > > > > >
> > > > > > The hasNext() itself throws this error.  I have to manually reset
> > > state
> > > > > and
> > > > > > sometime it is able to recover and other it is not. Any other
> clue
> > ?
> > > > > >
> > > > > >         public boolean hasNext() {
> > > > > >             LOG.info("called of  hasNext() :");
> > > > > >             int retry = 3;
> > > > > >             while(retry > 0){
> > > > > >                 try{
> > > > > >                     // this hasNext is blocking call..
> > > > > >                     boolean result = iterator.hasNext();
> > > > > >                     return result;
> > > > > >                 }catch(IllegalStateException exp){
> > > > > >                     iterator.resetState();
> > > > > >                     LOG.error("GOT IllegalStateException arg
> trying
> > > to
> > > > > > recover....", exp);
> > > > > >                     retry--;
> > > > > >                 }
> > > > > >             }
> > > > > >             return false;
> > > > > >         }
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Bhavesh
> > > > > >
> > > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <ju...@gmail.com>
> wrote:
> > > > > >
> > > > > > > The IllegalStateException typically happens if you call next()
> > > before
> > > > > > > hasNext() on the iterator.
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jun
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Neha,
> > > > > > > >
> > > > > > > > Thanks for your answer.  Can you please let me know how I can
> > > > resolve
> > > > > > the
> > > > > > > > Iterator IllegalStateException ?  I would appreciate your is
> > this
> > > > is
> > > > > > bug
> > > > > > > I
> > > > > > > > can file one or let me know if this is use case specific ?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Bhavesh
> > > > > > > >
> > > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > > > neha.narkhede@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > queued.max.message.chunks controls the consumer's fetcher
> > > queue.
> > > > > > > > >
> > > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > HI Neha,
> > > > > > > > > >
> > > > > > > > > > If I solved the problem number 1 think and number 2 will
> be
> > > > > solved
> > > > > > > > (prob
> > > > > > > > > > 1 is causing problem number 2(blocked)).  Can you please
> > let
> > > me
> > > > > > know
> > > > > > > > what
> > > > > > > > > > controls the queue size for *ConsumerFetcherThread*
> thread
> > ?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Please see the attached java source code which will
> > reproduce
> > > > the
> > > > > > > > > > problem.  You may remove the recovery process...  Please
> > > check.
> > > > > We
> > > > > > > > have
> > > > > > > > > to
> > > > > > > > > > do some work before we start reading from Kafka Stream
> > > > Interator
> > > > > > and
> > > > > > > > this
> > > > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > > > IllegalStateException: Iterator is in failed state*.
> > > > > > > > > >
> > > > > > > > > > Please let me know your finding and recommendation.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Bhavesh
> > > > > > > > > >
> > > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > > > > > > > neha.narkhede@gmail.com>
> > > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > >> >> Sometime it give following exception.
> > > > > > > > > >>
> > > > > > > > > >> It will help to have a more specific test case that
> > > reproduces
> > > > > the
> > > > > > > > > failed
> > > > > > > > > >> iterator state.
> > > > > > > > > >>
> > > > > > > > > >> Also, the consumer threads block if the fetcher queue is
> > > full.
> > > > > The
> > > > > > > > queue
> > > > > > > > > >> can fill up if your consumer thread dies or slows down.
> > I'd
> > > > > > > recommend
> > > > > > > > > you
> > > > > > > > > >> ensure that all your consumer threads are alive. You can
> > > take
> > > > a
> > > > > > > thread
> > > > > > > > > >> dump
> > > > > > > > > >> to verify this.
> > > > > > > > > >>
> > > > > > > > > >> Thanks,
> > > > > > > > > >> Neha
> > > > > > > > > >>
> > > > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> > > > > > > > > >> mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> wrote:
> > > > > > > > > >>
> > > > > > > > > >> > Hi Neha,
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I have two problems:.  Any help is greatly
> appreciated.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > 1)* java.lang.IllegalStateException: Iterator is in
> > failed
> > > > > > state*
> > > > > > > > > >> >
> > > > > > > > > >> >        ConsumerConnector  consumerConnector = Consumer
> > > > > > > > > >> >
> > > > > >  .createJavaConsumerConnector(getConsumerConfig());
> > > > > > > > > >> >         Map<String, Integer> topicCountMap = new
> > > > > HashMap<String,
> > > > > > > > > >> > Integer>();
> > > > > > > > > >> >         topicCountMap.put(topic, *32*);
> > > > > > > > > >> >         Map<String, List<KafkaStream<byte[], byte[]>>>
> > > > > > > > topicStreamMap
> > > > > > > > > =
> > > > > > > > > >> > consumerConnector
> > > > > > > > > >> >                 .createMessageStreams(topicCountMap);
> > > > > > > > > >> >
> > > > > > > > > >> >         List<KafkaStream<byte[], byte[]>> streams =
> > > > > > > > > >> >
> Collections.synchronizedList(topicStreamMap.get(topic));
> > > > > > > > > >> >
> > > > > > > > > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > > > > > > > > >> >
> > > > > > > > > >> >         Iterator<KafkaStream<byte[], byte[]>>
> iterator =
> > > > > > > > > >> > streams.iterator();
> > > > > > > > > >> >         // remove the head first list for this
> > > source...rest
> > > > > are
> > > > > > > for
> > > > > > > > > the
> > > > > > > > > >> > Dynamic Souce...
> > > > > > > > > >> >         mainIterator = iterator.next().iterator();
> > > > > > > > > >> >
> > > > > > > > > >> >         List<ConsumerIterator<byte[], byte[]>>
> > > iteratorList
> > > > =
> > > > > > new
> > > > > > > > > >> >
> > > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > > > > > > > > >> >         // now rest of the iterator must be registered
> > > now..
> > > > > > > > > >> >         while(iterator.hasNext()){
> > > > > > > > > >> >
> >  iteratorList.add(iterator.next().iterator());
> > > > > > > > > >> >         }
> > > > > > > > > >> >
> > >  *KafkaStreamRegistory.registerStream(mainSourceName,
> > > > > > > > > >> > iteratorList);*
> > > > > > > > > >> >
> > > > > > > > > >> > Once the Consumer iterator is created and registered.
> > We
> > > > use
> > > > > > this
> > > > > > > > in
> > > > > > > > > >> > another thread to start reading from the Consumer
> > > Iterator.
> > > > > > > >  Sometime
> > > > > > > > > >> it
> > > > > > > > > >> > give following exception.
> > > > > > > > > >> >
> > > > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR
> > > > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > > > > > > > > >> > (grizzled.slf4j.Logger.error:116)  - SourceThread:
> > > exception
> > > > > > > during
> > > > > > > > > >> reads.
> > > > > > > > > >> > Swallowed to continue next read.
> > > > > > > > > >> > java.lang.IllegalStateException: Iterator is in failed
> > > state
> > > > > > > > > >> >     at
> > > > > > > >
> kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > I have tried to recover from this state by using this:
> > > > > > > > > >> > iterator.resetState(); but it does not recover
> sometime.
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > *2) ConsumerFetcherThread are blocked on enqueue ?
> What
> > > > > > controls
> > > > > > > > size
> > > > > > > > > >> of
> > > > > > > > > >> > queue ? Why are they blocked ?  *Due to this our lags
> > are
> > > > > > > > increasing.
> > > > > > > > > >> our
> > > > > > > > > >> > threads blocked on hasNext()...
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > > > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on
> > > > condition
> > > > > > > > > >> > [0x0000000116379000]
> > > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > > >> >         - parking to wait for  <0x0000000704019388> (a
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > > >> >         at
> > > > > > > > > >>
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > > > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on
> > > > condition
> > > > > > > > > >> > [0x0000000116276000]
> > > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > > >> >         - parking to wait for  <0x0000000704064ce0> (a
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > >
> > > > >
> > scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > > >> >         at
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > >
> > > > > > >
> > > > >
> > >
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > > >> >         at
> > > > > > > > > >>
> > > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > Thanks,
> > > > > > > > > >> >
> > > > > > > > > >> > Bhavesh
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> >
> > > > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <
> > > > > > > > > neha.narkhede@gmail.com
> > > > > > > > > >> >
> > > > > > > > > >> > wrote:
> > > > > > > > > >> >
> > > > > > > > > >> > > Can you provide the steps to reproduce this issue?
> > > > > > > > > >> > >
> > > > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> > > > > > > > > >> > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> > > wrote:
> > > > > > > > > >> > >
> > > > > > > > > >> > > > I am using one from the Kafka Trunk branch.
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Thanks,
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > Bhavesh
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> > > > > > > > > >> > neha.narkhede@gmail.com>
> > > > > > > > > >> > > > wrote:
> > > > > > > > > >> > > >
> > > > > > > > > >> > > > > Which version of Kafka are you using on the
> > > consumer?
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry
> <
> > > > > > > > > >> > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > > >> > > > > wrote:
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > > > > HI Kafka Community ,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > I am using kafka trunk source code and I get
> > > > following
> > > > > > > > > >> exception.
> > > > > > > > > >> > > What
> > > > > > > > > >> > > > > > could cause the iterator to have FAILED state.
> > > > Please
> > > > > > let
> > > > > > > > me
> > > > > > > > > >> know
> > > > > > > > > >> > > how
> > > > > > > > > >> > > > I
> > > > > > > > > >> > > > > > can fix this issue.
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > *java.lang.IllegalStateException: Iterator is
> in
> > > > > failed
> > > > > > > > state
> > > > > > > > > >>   at
> > > > > > > > > >> > > > > >
> > > > > > > > >
> > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > > > > >> > > > > > Here is Properties:
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >         Properties props = new Properties();
> > > > > > > > > >> > > > > >         props.put("zookeeper.connect",
> > zkConnect);
> > > > > > > > > >> > > > > >         props.put("group.id", groupId);
> > > > > > > > > >> > > > > > *        props.put("consumer.timeout.ms <
> > > > > > > > > >> > http://consumer.timeout.ms
> > > > > > > > > >> > > >",
> > > > > > > > > >> > > > > > "-1");*
> > > > > > > > > >> > > > > >         props.put("
> zookeeper.session.timeout.ms
> > ",
> > > > > > > "10000");
> > > > > > > > > >> > > > > >         props.put("zookeeper.sync.time.ms",
> > > > "6000");
> > > > > > > > > >> > > > > >         props.put("auto.commit.interval.ms",
> > > > "2000");
> > > > > > > > > >> > > > > >         props.put("rebalance.max.retries",
> "8");
> > > > > > > > > >> > > > > >         props.put("auto.offset.reset",
> > "largest");
> > > > > > > > > >> > > > > >
> > > > >  props.put("fetch.message.max.bytes","2097152");
> > > > > > > > > >> > > > > >
> > > > > > >  props.put("socket.receive.buffer.bytes","2097152");
> > > > > > > > > >> > > > > >
>  props.put("auto.commit.enable","true");
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Thanks,
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > > > Bhavesh
> > > > > > > > > >> > > > > >
> > > > > > > > > >> > > > >
> > > > > > > > > >> > > >
> > > > > > > > > >> > >
> > > > > > > > > >> >
> > > > > > > > > >>
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Re: High Level Consumer Iterator IllegalStateException Issue

Posted by Jun Rao <ju...@gmail.com>.
Bhavesh,

That example has a lot of code. Could you provide a simpler test that
demonstrates the problem?

Thanks,

Jun

On Fri, Oct 31, 2014 at 10:07 PM, Bhavesh Mistry <mistry.p.bhavesh@gmail.com
> wrote:

> Hi Jun,
>
> Here is code base:
>
> https://github.com/bmistry13/kafka-trunk-producer/blob/master/KafkaConsumerWithDelay.java
>
> Please let me know if you can help me determine  the root cause.   Why
> there is illegal state and blocking ?
>
> Thanks,
>
> Bhavesh
>
> On Fri, Oct 31, 2014 at 8:33 AM, Jun Rao <ju...@gmail.com> wrote:
>
> > Do you have a simple test that can reproduce this issue?
> >
> > Thanks,
> >
> > Jun
> >
> > On Thu, Oct 30, 2014 at 8:34 PM, Bhavesh Mistry <
> > mistry.p.bhavesh@gmail.com>
> > wrote:
> >
> > > HI Jun,
> > >
> > > Consumer Connector is not closed because I can see the ConsumerFetcher
> > > Thread alive but Blocked on *put* and hasNext() is blocked on *take*.
> > > This is what I see after recovery.
> > >
> > >
> > >
> > > Thanks,
> > >
> > > Bhavesh
> > >
> > > On Thu, Oct 30, 2014 at 11:42 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > > > Another possibility is that the consumer connector is already closed
> > and
> > > > then you call hasNext() on the iterator.
> > > >
> > > > Thanks,
> > > >
> > > >
> > > > Jun
> > > >
> > > > On Wed, Oct 29, 2014 at 9:06 PM, Bhavesh Mistry <
> > > > mistry.p.bhavesh@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi Jun,
> > > > >
> > > > > The hasNext() itself throws this error.  I have to manually reset
> > state
> > > > and
> > > > > sometime it is able to recover and other it is not. Any other clue
> ?
> > > > >
> > > > >         public boolean hasNext() {
> > > > >             LOG.info("called of  hasNext() :");
> > > > >             int retry = 3;
> > > > >             while(retry > 0){
> > > > >                 try{
> > > > >                     // this hasNext is blocking call..
> > > > >                     boolean result = iterator.hasNext();
> > > > >                     return result;
> > > > >                 }catch(IllegalStateException exp){
> > > > >                     iterator.resetState();
> > > > >                     LOG.error("GOT IllegalStateException arg trying
> > to
> > > > > recover....", exp);
> > > > >                     retry--;
> > > > >                 }
> > > > >             }
> > > > >             return false;
> > > > >         }
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Bhavesh
> > > > >
> > > > > On Wed, Oct 29, 2014 at 6:36 PM, Jun Rao <ju...@gmail.com> wrote:
> > > > >
> > > > > > The IllegalStateException typically happens if you call next()
> > before
> > > > > > hasNext() on the iterator.
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jun
> > > > > >
> > > > > > On Tue, Oct 28, 2014 at 10:50 AM, Bhavesh Mistry <
> > > > > > mistry.p.bhavesh@gmail.com
> > > > > > > wrote:
> > > > > >
> > > > > > > Hi Neha,
> > > > > > >
> > > > > > > Thanks for your answer.  Can you please let me know how I can
> > > resolve
> > > > > the
> > > > > > > Iterator IllegalStateException ?  I would appreciate your is
> this
> > > is
> > > > > bug
> > > > > > I
> > > > > > > can file one or let me know if this is use case specific ?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Bhavesh
> > > > > > >
> > > > > > > On Tue, Oct 28, 2014 at 9:30 AM, Neha Narkhede <
> > > > > neha.narkhede@gmail.com>
> > > > > > > wrote:
> > > > > > >
> > > > > > > > queued.max.message.chunks controls the consumer's fetcher
> > queue.
> > > > > > > >
> > > > > > > > On Mon, Oct 27, 2014 at 9:32 PM, Bhavesh Mistry <
> > > > > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > HI Neha,
> > > > > > > > >
> > > > > > > > > If I solved the problem number 1 think and number 2 will be
> > > > solved
> > > > > > > (prob
> > > > > > > > > 1 is causing problem number 2(blocked)).  Can you please
> let
> > me
> > > > > know
> > > > > > > what
> > > > > > > > > controls the queue size for *ConsumerFetcherThread* thread
> ?
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Please see the attached java source code which will
> reproduce
> > > the
> > > > > > > > > problem.  You may remove the recovery process...  Please
> > check.
> > > > We
> > > > > > > have
> > > > > > > > to
> > > > > > > > > do some work before we start reading from Kafka Stream
> > > Interator
> > > > > and
> > > > > > > this
> > > > > > > > > seems to cause some issue with java.lang.
> > > > > > > > > IllegalStateException: Iterator is in failed state*.
> > > > > > > > >
> > > > > > > > > Please let me know your finding and recommendation.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Bhavesh
> > > > > > > > >
> > > > > > > > > On Mon, Oct 27, 2014 at 6:24 PM, Neha Narkhede <
> > > > > > > neha.narkhede@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > >> >> Sometime it give following exception.
> > > > > > > > >>
> > > > > > > > >> It will help to have a more specific test case that
> > reproduces
> > > > the
> > > > > > > > failed
> > > > > > > > >> iterator state.
> > > > > > > > >>
> > > > > > > > >> Also, the consumer threads block if the fetcher queue is
> > full.
> > > > The
> > > > > > > queue
> > > > > > > > >> can fill up if your consumer thread dies or slows down.
> I'd
> > > > > > recommend
> > > > > > > > you
> > > > > > > > >> ensure that all your consumer threads are alive. You can
> > take
> > > a
> > > > > > thread
> > > > > > > > >> dump
> > > > > > > > >> to verify this.
> > > > > > > > >>
> > > > > > > > >> Thanks,
> > > > > > > > >> Neha
> > > > > > > > >>
> > > > > > > > >> On Mon, Oct 27, 2014 at 2:14 PM, Bhavesh Mistry <
> > > > > > > > >> mistry.p.bhavesh@gmail.com>
> > > > > > > > >> wrote:
> > > > > > > > >>
> > > > > > > > >> > Hi Neha,
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I have two problems:.  Any help is greatly appreciated.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > 1)* java.lang.IllegalStateException: Iterator is in
> failed
> > > > > state*
> > > > > > > > >> >
> > > > > > > > >> >        ConsumerConnector  consumerConnector = Consumer
> > > > > > > > >> >
> > > > >  .createJavaConsumerConnector(getConsumerConfig());
> > > > > > > > >> >         Map<String, Integer> topicCountMap = new
> > > > HashMap<String,
> > > > > > > > >> > Integer>();
> > > > > > > > >> >         topicCountMap.put(topic, *32*);
> > > > > > > > >> >         Map<String, List<KafkaStream<byte[], byte[]>>>
> > > > > > > topicStreamMap
> > > > > > > > =
> > > > > > > > >> > consumerConnector
> > > > > > > > >> >                 .createMessageStreams(topicCountMap);
> > > > > > > > >> >
> > > > > > > > >> >         List<KafkaStream<byte[], byte[]>> streams =
> > > > > > > > >> > Collections.synchronizedList(topicStreamMap.get(topic));
> > > > > > > > >> >
> > > > > > > > >> >         AppStaticInfo info = Mupd8Main.STATICINFO();
> > > > > > > > >> >
> > > > > > > > >> >         Iterator<KafkaStream<byte[], byte[]>> iterator =
> > > > > > > > >> > streams.iterator();
> > > > > > > > >> >         // remove the head first list for this
> > source...rest
> > > > are
> > > > > > for
> > > > > > > > the
> > > > > > > > >> > Dynamic Souce...
> > > > > > > > >> >         mainIterator = iterator.next().iterator();
> > > > > > > > >> >
> > > > > > > > >> >         List<ConsumerIterator<byte[], byte[]>>
> > iteratorList
> > > =
> > > > > new
> > > > > > > > >> >
> > ArrayList<ConsumerIterator<byte[],byte[]>>(streams.size());
> > > > > > > > >> >         // now rest of the iterator must be registered
> > now..
> > > > > > > > >> >         while(iterator.hasNext()){
> > > > > > > > >> >
>  iteratorList.add(iterator.next().iterator());
> > > > > > > > >> >         }
> > > > > > > > >> >
> >  *KafkaStreamRegistory.registerStream(mainSourceName,
> > > > > > > > >> > iteratorList);*
> > > > > > > > >> >
> > > > > > > > >> > Once the Consumer iterator is created and registered.
> We
> > > use
> > > > > this
> > > > > > > in
> > > > > > > > >> > another thread to start reading from the Consumer
> > Iterator.
> > > > > > >  Sometime
> > > > > > > > >> it
> > > > > > > > >> > give following exception.
> > > > > > > > >> >
> > > > > > > > >> > 24 Oct 2014 16:03:25,923 ERROR
> > > > > > > > >> > [SourceReader:request_source:LogStreamKafkaSource1]
> > > > > > > > >> > (grizzled.slf4j.Logger.error:116)  - SourceThread:
> > exception
> > > > > > during
> > > > > > > > >> reads.
> > > > > > > > >> > Swallowed to continue next read.
> > > > > > > > >> > java.lang.IllegalStateException: Iterator is in failed
> > state
> > > > > > > > >> >     at
> > > > > > > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > I have tried to recover from this state by using this:
> > > > > > > > >> > iterator.resetState(); but it does not recover sometime.
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > *2) ConsumerFetcherThread are blocked on enqueue ?  What
> > > > > controls
> > > > > > > size
> > > > > > > > >> of
> > > > > > > > >> > queue ? Why are they blocked ?  *Due to this our lags
> are
> > > > > > > increasing.
> > > > > > > > >> our
> > > > > > > > >> > threads blocked on hasNext()...
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-1"
> > > > > > > > >> > prio=5 tid=0x00007fb36292c800 nid=0xab03 waiting on
> > > condition
> > > > > > > > >> > [0x0000000116379000]
> > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > >> >         - parking to wait for  <0x0000000704019388> (a
> > > > > > > > >> >
> > > > > > >
> > > >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > >> >         at
> > > > > > > > >>
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> "ConsumerFetcherThread-Mupd8_Kafka_caq1-fl4-ilo.walmart.com-1414443037185-70e42954-0-2"
> > > > > > > > >> > prio=5 tid=0x00007fb36229e000 nid=0xa903 waiting on
> > > condition
> > > > > > > > >> > [0x0000000116276000]
> > > > > > > > >> >    java.lang.Thread.State: WAITING (parking)
> > > > > > > > >> >         at sun.misc.Unsafe.park(Native Method)
> > > > > > > > >> >         - parking to wait for  <0x0000000704064ce0> (a
> > > > > > > > >> >
> > > > > > >
> > > >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:131)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > >
> > > >
> scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:112)
> > > > > > > > >> >         at kafka.utils.Utils$.inLock(Utils.scala:535)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:111)
> > > > > > > > >> >         at
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > >
> > > > > >
> > > >
> > kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:89)
> > > > > > > > >> >         at
> > > > > > > > >>
> > > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > Thanks,
> > > > > > > > >> >
> > > > > > > > >> > Bhavesh
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> >
> > > > > > > > >> > On Sun, Oct 26, 2014 at 3:14 PM, Neha Narkhede <
> > > > > > > > neha.narkhede@gmail.com
> > > > > > > > >> >
> > > > > > > > >> > wrote:
> > > > > > > > >> >
> > > > > > > > >> > > Can you provide the steps to reproduce this issue?
> > > > > > > > >> > >
> > > > > > > > >> > > On Fri, Oct 24, 2014 at 6:11 PM, Bhavesh Mistry <
> > > > > > > > >> > > mistry.p.bhavesh@gmail.com>
> > > > > > > > >> > > wrote:
> > > > > > > > >> > >
> > > > > > > > >> > > > I am using one from the Kafka Trunk branch.
> > > > > > > > >> > > >
> > > > > > > > >> > > > Thanks,
> > > > > > > > >> > > >
> > > > > > > > >> > > > Bhavesh
> > > > > > > > >> > > >
> > > > > > > > >> > > > On Fri, Oct 24, 2014 at 5:24 PM, Neha Narkhede <
> > > > > > > > >> > neha.narkhede@gmail.com>
> > > > > > > > >> > > > wrote:
> > > > > > > > >> > > >
> > > > > > > > >> > > > > Which version of Kafka are you using on the
> > consumer?
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > On Fri, Oct 24, 2014 at 4:14 PM, Bhavesh Mistry <
> > > > > > > > >> > > > > mistry.p.bhavesh@gmail.com>
> > > > > > > > >> > > > > wrote:
> > > > > > > > >> > > > >
> > > > > > > > >> > > > > > HI Kafka Community ,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > I am using kafka trunk source code and I get
> > > following
> > > > > > > > >> exception.
> > > > > > > > >> > > What
> > > > > > > > >> > > > > > could cause the iterator to have FAILED state.
> > > Please
> > > > > let
> > > > > > > me
> > > > > > > > >> know
> > > > > > > > >> > > how
> > > > > > > > >> > > > I
> > > > > > > > >> > > > > > can fix this issue.
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > *java.lang.IllegalStateException: Iterator is in
> > > > failed
> > > > > > > state
> > > > > > > > >>   at
> > > > > > > > >> > > > > >
> > > > > > > >
> > kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:54)*
> > > > > > > > >> > > > > > Here is Properties:
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >         Properties props = new Properties();
> > > > > > > > >> > > > > >         props.put("zookeeper.connect",
> zkConnect);
> > > > > > > > >> > > > > >         props.put("group.id", groupId);
> > > > > > > > >> > > > > > *        props.put("consumer.timeout.ms <
> > > > > > > > >> > http://consumer.timeout.ms
> > > > > > > > >> > > >",
> > > > > > > > >> > > > > > "-1");*
> > > > > > > > >> > > > > >         props.put("zookeeper.session.timeout.ms
> ",
> > > > > > "10000");
> > > > > > > > >> > > > > >         props.put("zookeeper.sync.time.ms",
> > > "6000");
> > > > > > > > >> > > > > >         props.put("auto.commit.interval.ms",
> > > "2000");
> > > > > > > > >> > > > > >         props.put("rebalance.max.retries", "8");
> > > > > > > > >> > > > > >         props.put("auto.offset.reset",
> "largest");
> > > > > > > > >> > > > > >
> > > >  props.put("fetch.message.max.bytes","2097152");
> > > > > > > > >> > > > > >
> > > > > >  props.put("socket.receive.buffer.bytes","2097152");
> > > > > > > > >> > > > > >         props.put("auto.commit.enable","true");
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Thanks,
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > > > Bhavesh
> > > > > > > > >> > > > > >
> > > > > > > > >> > > > >
> > > > > > > > >> > > >
> > > > > > > > >> > >
> > > > > > > > >> >
> > > > > > > > >>
> > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>