You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com> on 2016/08/31 19:11:45 UTC

Kafka consumers unable to process message

Hi there,

Kafka consumer gets stuck at consumer.poll() method if my current datacenter is down and replicated messages are in remote datacenter.

How to solve that issue?

Thanks
Achintya

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
The replicas are always trying to fetch new data from the partition leader.
When the leader fails, any in-flight fetches will fail and cause errors
such as the ones you saw in the broker log. Eventually the replicas will
discover the new leader and begin fetching again. And of course one of the
replicas will become the new leader.

-Jason

On Wed, Aug 31, 2016 at 8:04 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> I'm trying get the consumer logs and will send you. So it means it can
> happen even my local datacenter too. Still I'm not understanding if 3 nodes
> are up and message already replicated why it's trying to fetch the data
> from failed node. Can you please explain bit details how it works. Thanks
> for your response.
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:56 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> The exceptions show one of the replica fetcher threads on the broker
> failing which makes perfect sense since some of the partitions were bound
> to have leaders in the failed datacenter. I'd actually like to see the
> consumer logs at DEBUG level if possible.
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > No, I didn't bring down any zookeeper server. Even I tried with 3
> > zookeeper server one as an 'Observer' but the same issue.
> >
> > Here is the server log from one of the node of my other datacenter:
> >
> > [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> > (kafka.log.Log)
> > [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4]
> > Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> > BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> > ReplicaFetcherManager)
> > [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to 3 was disconnected before the
> > response was read
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:87)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:84)
> >         at scala.Option.foreach(Option.scala:257)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:84)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:80)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> > NetworkClientBlockingOps.scala:137)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollContinuously$extension(
> > NetworkClientBlockingOps.scala:143)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> > extension(NetworkClientBlockingOps.scala:80)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:244)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb
> (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4]
> > Removed fetcher for partitions [TEST3,0]
> > (kafka.server.ReplicaFetcherManager)
> > [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting
> > down
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown
> > completed (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure?
> > false)
> > (kafka.utils.ZKCheckedEphemeral)
> > [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> > (kafka.utils.ZKCheckedEphemeral)
> > [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> > (kafka.server.ZookeeperLeaderElector)
> > [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> > ZookeeperLeaderElector$LeaderChangeListener)
> > [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4:
> > Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6
> > (kafka.cluster.Partition)
> > [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> >
> >
> > Why it's trying to connect the node3 of my local datacenter and it's
> > throwing IOException.
> >
> > Thanks
> > Achintya
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 10:26 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > Just to clarify, you did not take down either of the zookeepers in
> > this test, right? Having only two zookeepers in the ensemble would
> > mean that if either one of them failed, zookeeper wouldn't be able to
> reach quorum.
> >
> > I'm not entirely sure why this would happen. One possibility is that
> > the consumer is failing to find the new coordinator, which might
> > happen if all the replicas for one of the __consumer_offsets
> > partitions were located in the "failed" datacenter. Perhaps you can
> > enable DEBUG logging and post some logs so we can see what it's actually
> doing during poll().
> >
> > By the way, I noticed that your consumer configuration settings seem a
> > little mixed up. The new consumer doesn't actually communicate with
> > Zookeeper, so there's no need for those settings. And you don't need
> > to include the "offsets.storage" option since Kafka is the only
> > choice. Also, I don't think "consumer.timeout.ms" is an option.
> >
> > -Jason
> >
> >
> > On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for your response.
> > >
> > > I know that is a known issue and I resolved it calling wakeup method
> > > by another thread. But here my problem is different, let me explain
> > > , it's very basic
> > >
> > > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > > another(remote) datacenter and kept replication factor 6 with 2
> > > zookeeper servers one from each datacenter ). Now I brought down all
> > > 3 nodes of my local datacenter and produced few messages and I see
> > > producer is working fine even my local data center nodes are down.
> > > It successfully writes the messages to other data center nodes. But
> > > when I'm trying to consume the messages the consumer.poll method
> > > gets stuck as my local datacenter is down though other datacenter's
> nodes are up.
> > >
> > > My question is as the data has been written successfully to other
> > > datacenter why consumer part is not working?
> > >
> > > Here is my Producer settings:
> > >
> > > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> > > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616
> ,psaq1-ab.
> > > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > > -ab.sys.comcast.net:61617");
> > > props.put("acks", "1");
> > > props.put("max.block.ms", 1000);
> > > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > > StringSerializer");
> > > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > > CustomMessageSer");
> > >
> > > and here is Consumer settings:
> > >
> > > props.put("group.id", "app-consumer");
> > >                 props.put("enable.auto.commit", "false");
> > >                 props.put("auto.offset.reset", "earliest");
> > >                 props.put("auto.commit.interval.ms", "500");
> > >                 props.put("session.timeout.ms", "120000");
> > >                 props.put("consumer.timeout.ms", "10000");
> > >                 props.put("zookeeper.session.timeout.ms", "120000");
> > >                 props.put("zookeeper.connection.timeout.ms", "60000");
> > >                 props.put("offsets.storage","kafka");
> > >                 props.put("request.timeout.ms", "150000");
> > >                 props.put("bootstrap.servers",
> > > "psaq1-wc.sys.comcast.net
> > :
> > > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > > -ab.sys.comcast.net:61617");
> > >                 props.put("key.deserializer", "org.apache.kafka.common.
> > > serialization.StringDeserializer");
> > >                 props.put("value.deserializer",
> > > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> > >
> > > Is it because of consumer is not able to get the broker metadata if
> > > it is trying to connect other datacenter's zookeeper server? I tried
> > > with to increate the zookeeper session timeout and connection time
> > > out but no
> > luck.
> > >
> > > Please help on this.
> > > Thanks
> > > Achintya
> > >
> > >
> > > -----Original Message-----
> > > From: Jason Gustafson [mailto:jason@confluent.io]
> > > Sent: Wednesday, August 31, 2016 4:05 PM
> > > To: users@kafka.apache.org
> > > Cc: dev@kafka.apache.org
> > > Subject: Re: Kafka consumers unable to process message
> > >
> > > Hi Achintya,
> > >
> > > We have a JIRA for this problem: https://issues.
> > > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise
> > > an exception in this case or do you just want to keep it from
> > > blocking indefinitely? If the latter, you could escape the poll from
> > > another thread using wakeup().
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > > Achintya_Ghosh@comcast.com> wrote:
> > >
> > > > Hi there,
> > > >
> > > > Kafka consumer gets stuck at consumer.poll() method if my current
> > > > datacenter is down and replicated messages are in remote datacenter.
> > > >
> > > > How to solve that issue?
> > > >
> > > > Thanks
> > > > Achintya
> > > >
> > >
> >
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
The replicas are always trying to fetch new data from the partition leader.
When the leader fails, any in-flight fetches will fail and cause errors
such as the ones you saw in the broker log. Eventually the replicas will
discover the new leader and begin fetching again. And of course one of the
replicas will become the new leader.

-Jason

On Wed, Aug 31, 2016 at 8:04 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> I'm trying get the consumer logs and will send you. So it means it can
> happen even my local datacenter too. Still I'm not understanding if 3 nodes
> are up and message already replicated why it's trying to fetch the data
> from failed node. Can you please explain bit details how it works. Thanks
> for your response.
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:56 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> The exceptions show one of the replica fetcher threads on the broker
> failing which makes perfect sense since some of the partitions were bound
> to have leaders in the failed datacenter. I'd actually like to see the
> consumer logs at DEBUG level if possible.
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > No, I didn't bring down any zookeeper server. Even I tried with 3
> > zookeeper server one as an 'Observer' but the same issue.
> >
> > Here is the server log from one of the node of my other datacenter:
> >
> > [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> > (kafka.log.Log)
> > [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4]
> > Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> > BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> > ReplicaFetcherManager)
> > [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to 3 was disconnected before the
> > response was read
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:87)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> > NetworkClientBlockingOps.scala:84)
> >         at scala.Option.foreach(Option.scala:257)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:84)
> >         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> > blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> > scala:80)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> > NetworkClientBlockingOps.scala:137)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollContinuously$extension(
> > NetworkClientBlockingOps.scala:143)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> > extension(NetworkClientBlockingOps.scala:80)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:244)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb
> (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in
> > fetch
> > kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> > ReplicaFetcherThread)
> > java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id:
> > 3
> > rack: null) failed
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:63)
> >         at
> > kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> > extension$2.apply(NetworkClientBlockingOps.scala:59)
> >         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> > NetworkClientBlockingOps.scala:112)
> >         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> > NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> > scala:120)
> >         at
> > kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> > NetworkClientBlockingOps.scala:59)
> >         at kafka.server.ReplicaFetcherThread.sendRequest(
> > ReplicaFetcherThread.scala:239)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:229)
> >         at kafka.server.ReplicaFetcherThread.fetch(
> > ReplicaFetcherThread.scala:42)
> >         at kafka.server.AbstractFetcherThread.processFetchRequest(
> > AbstractFetcherThread.scala:107)
> >         at kafka.server.AbstractFetcherThread.doWork(
> > AbstractFetcherThread.scala:98)
> >         at
> > kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> > [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4]
> > Removed fetcher for partitions [TEST3,0]
> > (kafka.server.ReplicaFetcherManager)
> > [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting
> > down
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> > (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown
> > completed (kafka.server.ReplicaFetcherThread)
> > [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure?
> > false)
> > (kafka.utils.ZKCheckedEphemeral)
> > [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> > (kafka.utils.ZKCheckedEphemeral)
> > [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> > (kafka.server.ZookeeperLeaderElector)
> > [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> > ZookeeperLeaderElector$LeaderChangeListener)
> > [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4:
> > Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6
> > (kafka.cluster.Partition)
> > [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> > [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> > Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> > GroupMetadataManager)
> >
> >
> > Why it's trying to connect the node3 of my local datacenter and it's
> > throwing IOException.
> >
> > Thanks
> > Achintya
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 10:26 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > Just to clarify, you did not take down either of the zookeepers in
> > this test, right? Having only two zookeepers in the ensemble would
> > mean that if either one of them failed, zookeeper wouldn't be able to
> reach quorum.
> >
> > I'm not entirely sure why this would happen. One possibility is that
> > the consumer is failing to find the new coordinator, which might
> > happen if all the replicas for one of the __consumer_offsets
> > partitions were located in the "failed" datacenter. Perhaps you can
> > enable DEBUG logging and post some logs so we can see what it's actually
> doing during poll().
> >
> > By the way, I noticed that your consumer configuration settings seem a
> > little mixed up. The new consumer doesn't actually communicate with
> > Zookeeper, so there's no need for those settings. And you don't need
> > to include the "offsets.storage" option since Kafka is the only
> > choice. Also, I don't think "consumer.timeout.ms" is an option.
> >
> > -Jason
> >
> >
> > On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi Jason,
> > >
> > > Thanks for your response.
> > >
> > > I know that is a known issue and I resolved it calling wakeup method
> > > by another thread. But here my problem is different, let me explain
> > > , it's very basic
> > >
> > > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > > another(remote) datacenter and kept replication factor 6 with 2
> > > zookeeper servers one from each datacenter ). Now I brought down all
> > > 3 nodes of my local datacenter and produced few messages and I see
> > > producer is working fine even my local data center nodes are down.
> > > It successfully writes the messages to other data center nodes. But
> > > when I'm trying to consume the messages the consumer.poll method
> > > gets stuck as my local datacenter is down though other datacenter's
> nodes are up.
> > >
> > > My question is as the data has been written successfully to other
> > > datacenter why consumer part is not working?
> > >
> > > Here is my Producer settings:
> > >
> > > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> > > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616
> ,psaq1-ab.
> > > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > > -ab.sys.comcast.net:61617");
> > > props.put("acks", "1");
> > > props.put("max.block.ms", 1000);
> > > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > > StringSerializer");
> > > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > > CustomMessageSer");
> > >
> > > and here is Consumer settings:
> > >
> > > props.put("group.id", "app-consumer");
> > >                 props.put("enable.auto.commit", "false");
> > >                 props.put("auto.offset.reset", "earliest");
> > >                 props.put("auto.commit.interval.ms", "500");
> > >                 props.put("session.timeout.ms", "120000");
> > >                 props.put("consumer.timeout.ms", "10000");
> > >                 props.put("zookeeper.session.timeout.ms", "120000");
> > >                 props.put("zookeeper.connection.timeout.ms", "60000");
> > >                 props.put("offsets.storage","kafka");
> > >                 props.put("request.timeout.ms", "150000");
> > >                 props.put("bootstrap.servers",
> > > "psaq1-wc.sys.comcast.net
> > :
> > > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > > -ab.sys.comcast.net:61617");
> > >                 props.put("key.deserializer", "org.apache.kafka.common.
> > > serialization.StringDeserializer");
> > >                 props.put("value.deserializer",
> > > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> > >
> > > Is it because of consumer is not able to get the broker metadata if
> > > it is trying to connect other datacenter's zookeeper server? I tried
> > > with to increate the zookeeper session timeout and connection time
> > > out but no
> > luck.
> > >
> > > Please help on this.
> > > Thanks
> > > Achintya
> > >
> > >
> > > -----Original Message-----
> > > From: Jason Gustafson [mailto:jason@confluent.io]
> > > Sent: Wednesday, August 31, 2016 4:05 PM
> > > To: users@kafka.apache.org
> > > Cc: dev@kafka.apache.org
> > > Subject: Re: Kafka consumers unable to process message
> > >
> > > Hi Achintya,
> > >
> > > We have a JIRA for this problem: https://issues.
> > > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise
> > > an exception in this case or do you just want to keep it from
> > > blocking indefinitely? If the latter, you could escape the poll from
> > > another thread using wakeup().
> > >
> > > Thanks,
> > > Jason
> > >
> > > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > > Achintya_Ghosh@comcast.com> wrote:
> > >
> > > > Hi there,
> > > >
> > > > Kafka consumer gets stuck at consumer.poll() method if my current
> > > > datacenter is down and replicated messages are in remote datacenter.
> > > >
> > > > How to solve that issue?
> > > >
> > > > Thanks
> > > > Achintya
> > > >
> > >
> >
>

RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
I'm trying get the consumer logs and will send you. So it means it can happen even my local datacenter too. Still I'm not understanding if 3 nodes are up and message already replicated why it's trying to fetch the data from failed node. Can you please explain bit details how it works. Thanks for your response.

-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 10:56 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

The exceptions show one of the replica fetcher threads on the broker failing which makes perfect sense since some of the partitions were bound to have leaders in the failed datacenter. I'd actually like to see the consumer logs at DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3 
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] 
> Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker 
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the 
> response was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] 
> Removed fetcher for partitions [TEST3,0] 
> (kafka.server.ReplicaFetcherManager)
> [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting 
> down
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown 
> completed (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? 
> false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: 
> Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 
> (kafka.cluster.Partition)
> [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
>
> Why it's trying to connect the node3 of my local datacenter and it's 
> throwing IOException.
>
> Thanks
> Achintya
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:26 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> Just to clarify, you did not take down either of the zookeepers in 
> this test, right? Having only two zookeepers in the ensemble would 
> mean that if either one of them failed, zookeeper wouldn't be able to reach quorum.
>
> I'm not entirely sure why this would happen. One possibility is that 
> the consumer is failing to find the new coordinator, which might 
> happen if all the replicas for one of the __consumer_offsets 
> partitions were located in the "failed" datacenter. Perhaps you can 
> enable DEBUG logging and post some logs so we can see what it's actually doing during poll().
>
> By the way, I noticed that your consumer configuration settings seem a 
> little mixed up. The new consumer doesn't actually communicate with 
> Zookeeper, so there's no need for those settings. And you don't need 
> to include the "offsets.storage" option since Kafka is the only 
> choice. Also, I don't think "consumer.timeout.ms" is an option.
>
> -Jason
>
>
> On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < 
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > Thanks for your response.
> >
> > I know that is a known issue and I resolved it calling wakeup method 
> > by another thread. But here my problem is different, let me explain 
> > , it's very basic
> >
> > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > another(remote) datacenter and kept replication factor 6 with 2 
> > zookeeper servers one from each datacenter ). Now I brought down all 
> > 3 nodes of my local datacenter and produced few messages and I see 
> > producer is working fine even my local data center nodes are down. 
> > It successfully writes the messages to other data center nodes. But 
> > when I'm trying to consume the messages the consumer.poll method 
> > gets stuck as my local datacenter is down though other datacenter's nodes are up.
> >
> > My question is as the data has been written successfully to other 
> > datacenter why consumer part is not working?
> >
> > Here is my Producer settings:
> >
> > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, 
> > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> > props.put("acks", "1");
> > props.put("max.block.ms", 1000);
> > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > StringSerializer");
> > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > CustomMessageSer");
> >
> > and here is Consumer settings:
> >
> > props.put("group.id", "app-consumer");
> >                 props.put("enable.auto.commit", "false");
> >                 props.put("auto.offset.reset", "earliest");
> >                 props.put("auto.commit.interval.ms", "500");
> >                 props.put("session.timeout.ms", "120000");
> >                 props.put("consumer.timeout.ms", "10000");
> >                 props.put("zookeeper.session.timeout.ms", "120000");
> >                 props.put("zookeeper.connection.timeout.ms", "60000");
> >                 props.put("offsets.storage","kafka");
> >                 props.put("request.timeout.ms", "150000");
> >                 props.put("bootstrap.servers", 
> > "psaq1-wc.sys.comcast.net
> :
> > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> >                 props.put("key.deserializer", "org.apache.kafka.common.
> > serialization.StringDeserializer");
> >                 props.put("value.deserializer", 
> > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> >
> > Is it because of consumer is not able to get the broker metadata if 
> > it is trying to connect other datacenter's zookeeper server? I tried 
> > with to increate the zookeeper session timeout and connection time 
> > out but no
> luck.
> >
> > Please help on this.
> > Thanks
> > Achintya
> >
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 4:05 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise 
> > an exception in this case or do you just want to keep it from 
> > blocking indefinitely? If the latter, you could escape the poll from 
> > another thread using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi there,
> > >
> > > Kafka consumer gets stuck at consumer.poll() method if my current 
> > > datacenter is down and replicated messages are in remote datacenter.
> > >
> > > How to solve that issue?
> > >
> > > Thanks
> > > Achintya
> > >
> >
>

RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
I'm trying get the consumer logs and will send you. So it means it can happen even my local datacenter too. Still I'm not understanding if 3 nodes are up and message already replicated why it's trying to fetch the data from failed node. Can you please explain bit details how it works. Thanks for your response.

-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 10:56 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

The exceptions show one of the replica fetcher threads on the broker failing which makes perfect sense since some of the partitions were bound to have leaders in the failed datacenter. I'd actually like to see the consumer logs at DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3 
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] 
> Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker 
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the 
> response was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in 
> fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 
> 3
> rack: null) failed
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at 
> kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at 
> kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at 
> kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] 
> Removed fetcher for partitions [TEST3,0] 
> (kafka.server.ReplicaFetcherManager)
> [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting 
> down
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown 
> completed (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? 
> false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: 
> Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 
> (kafka.cluster.Partition)
> [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
>
> Why it's trying to connect the node3 of my local datacenter and it's 
> throwing IOException.
>
> Thanks
> Achintya
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:26 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> Just to clarify, you did not take down either of the zookeepers in 
> this test, right? Having only two zookeepers in the ensemble would 
> mean that if either one of them failed, zookeeper wouldn't be able to reach quorum.
>
> I'm not entirely sure why this would happen. One possibility is that 
> the consumer is failing to find the new coordinator, which might 
> happen if all the replicas for one of the __consumer_offsets 
> partitions were located in the "failed" datacenter. Perhaps you can 
> enable DEBUG logging and post some logs so we can see what it's actually doing during poll().
>
> By the way, I noticed that your consumer configuration settings seem a 
> little mixed up. The new consumer doesn't actually communicate with 
> Zookeeper, so there's no need for those settings. And you don't need 
> to include the "offsets.storage" option since Kafka is the only 
> choice. Also, I don't think "consumer.timeout.ms" is an option.
>
> -Jason
>
>
> On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < 
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > Thanks for your response.
> >
> > I know that is a known issue and I resolved it calling wakeup method 
> > by another thread. But here my problem is different, let me explain 
> > , it's very basic
> >
> > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > another(remote) datacenter and kept replication factor 6 with 2 
> > zookeeper servers one from each datacenter ). Now I brought down all 
> > 3 nodes of my local datacenter and produced few messages and I see 
> > producer is working fine even my local data center nodes are down. 
> > It successfully writes the messages to other data center nodes. But 
> > when I'm trying to consume the messages the consumer.poll method 
> > gets stuck as my local datacenter is down though other datacenter's nodes are up.
> >
> > My question is as the data has been written successfully to other 
> > datacenter why consumer part is not working?
> >
> > Here is my Producer settings:
> >
> > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, 
> > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> > props.put("acks", "1");
> > props.put("max.block.ms", 1000);
> > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > StringSerializer");
> > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > CustomMessageSer");
> >
> > and here is Consumer settings:
> >
> > props.put("group.id", "app-consumer");
> >                 props.put("enable.auto.commit", "false");
> >                 props.put("auto.offset.reset", "earliest");
> >                 props.put("auto.commit.interval.ms", "500");
> >                 props.put("session.timeout.ms", "120000");
> >                 props.put("consumer.timeout.ms", "10000");
> >                 props.put("zookeeper.session.timeout.ms", "120000");
> >                 props.put("zookeeper.connection.timeout.ms", "60000");
> >                 props.put("offsets.storage","kafka");
> >                 props.put("request.timeout.ms", "150000");
> >                 props.put("bootstrap.servers", 
> > "psaq1-wc.sys.comcast.net
> :
> > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> >                 props.put("key.deserializer", "org.apache.kafka.common.
> > serialization.StringDeserializer");
> >                 props.put("value.deserializer", 
> > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> >
> > Is it because of consumer is not able to get the broker metadata if 
> > it is trying to connect other datacenter's zookeeper server? I tried 
> > with to increate the zookeeper session timeout and connection time 
> > out but no
> luck.
> >
> > Please help on this.
> > Thanks
> > Achintya
> >
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 4:05 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise 
> > an exception in this case or do you just want to keep it from 
> > blocking indefinitely? If the latter, you could escape the poll from 
> > another thread using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi there,
> > >
> > > Kafka consumer gets stuck at consumer.poll() method if my current 
> > > datacenter is down and replicated messages are in remote datacenter.
> > >
> > > How to solve that issue?
> > >
> > > Thanks
> > > Achintya
> > >
> >
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
The exceptions show one of the replica fetcher threads on the broker
failing which makes perfect sense since some of the partitions were bound
to have leaders in the failed datacenter. I'd actually like to see the
consumer logs at DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added
> fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed
> fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager)
> [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown
> completed (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking
> ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition)
> [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
>
> Why it's trying to connect the node3 of my local datacenter and it's
> throwing IOException.
>
> Thanks
> Achintya
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:26 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> Just to clarify, you did not take down either of the zookeepers in this
> test, right? Having only two zookeepers in the ensemble would mean that if
> either one of them failed, zookeeper wouldn't be able to reach quorum.
>
> I'm not entirely sure why this would happen. One possibility is that the
> consumer is failing to find the new coordinator, which might happen if all
> the replicas for one of the __consumer_offsets partitions were located in
> the "failed" datacenter. Perhaps you can enable DEBUG logging and post some
> logs so we can see what it's actually doing during poll().
>
> By the way, I noticed that your consumer configuration settings seem a
> little mixed up. The new consumer doesn't actually communicate with
> Zookeeper, so there's no need for those settings. And you don't need to
> include the "offsets.storage" option since Kafka is the only choice. Also,
> I don't think "consumer.timeout.ms" is an option.
>
> -Jason
>
>
> On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > Thanks for your response.
> >
> > I know that is a known issue and I resolved it calling wakeup method
> > by another thread. But here my problem is different, let me explain ,
> > it's very basic
> >
> > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > another(remote) datacenter and kept replication factor 6 with 2
> > zookeeper servers one from each datacenter ). Now I brought down all 3
> > nodes of my local datacenter and produced few messages and I see
> > producer is working fine even my local data center nodes are down. It
> > successfully writes the messages to other data center nodes. But when
> > I'm trying to consume the messages the consumer.poll method gets stuck
> > as my local datacenter is down though other datacenter's nodes are up.
> >
> > My question is as the data has been written successfully to other
> > datacenter why consumer part is not working?
> >
> > Here is my Producer settings:
> >
> > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> > props.put("acks", "1");
> > props.put("max.block.ms", 1000);
> > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > StringSerializer");
> > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > CustomMessageSer");
> >
> > and here is Consumer settings:
> >
> > props.put("group.id", "app-consumer");
> >                 props.put("enable.auto.commit", "false");
> >                 props.put("auto.offset.reset", "earliest");
> >                 props.put("auto.commit.interval.ms", "500");
> >                 props.put("session.timeout.ms", "120000");
> >                 props.put("consumer.timeout.ms", "10000");
> >                 props.put("zookeeper.session.timeout.ms", "120000");
> >                 props.put("zookeeper.connection.timeout.ms", "60000");
> >                 props.put("offsets.storage","kafka");
> >                 props.put("request.timeout.ms", "150000");
> >                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net
> :
> > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> >                 props.put("key.deserializer", "org.apache.kafka.common.
> > serialization.StringDeserializer");
> >                 props.put("value.deserializer",
> > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> >
> > Is it because of consumer is not able to get the broker metadata if it
> > is trying to connect other datacenter's zookeeper server? I tried with
> > to increate the zookeeper session timeout and connection time out but no
> luck.
> >
> > Please help on this.
> > Thanks
> > Achintya
> >
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 4:05 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise
> > an exception in this case or do you just want to keep it from blocking
> > indefinitely? If the latter, you could escape the poll from another
> > thread using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi there,
> > >
> > > Kafka consumer gets stuck at consumer.poll() method if my current
> > > datacenter is down and replicated messages are in remote datacenter.
> > >
> > > How to solve that issue?
> > >
> > > Thanks
> > > Achintya
> > >
> >
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
The exceptions show one of the replica fetcher threads on the broker
failing which makes perfect sense since some of the partitions were bound
to have leaders in the failed datacenter. I'd actually like to see the
consumer logs at DEBUG level if possible.

Thanks,
Jason

On Wed, Aug 31, 2016 at 7:48 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> No, I didn't bring down any zookeeper server. Even I tried with 3
> zookeeper server one as an 'Observer' but the same issue.
>
> Here is the server log from one of the node of my other datacenter:
>
> [2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0.
> (kafka.log.Log)
> [2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added
> fetcher for partitions List([[TEST3,0], initOffset 0 to broker
> BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.
> ReplicaFetcherManager)
> [2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to 3 was disconnected before the response
> was read
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:87)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(
> NetworkClientBlockingOps.scala:84)
>         at scala.Option.foreach(Option.scala:257)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:84)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$
> blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.
> scala:80)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(
> NetworkClientBlockingOps.scala:137)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollContinuously$extension(
> NetworkClientBlockingOps.scala:143)
>         at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$
> extension(NetworkClientBlockingOps.scala:80)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:244)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch
> kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.
> ReplicaFetcherThread)
> java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3
> rack: null) failed
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:63)
>         at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$
> extension$2.apply(NetworkClientBlockingOps.scala:59)
>         at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(
> NetworkClientBlockingOps.scala:112)
>         at kafka.utils.NetworkClientBlockingOps$.kafka$utils$
> NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.
> scala:120)
>         at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(
> NetworkClientBlockingOps.scala:59)
>         at kafka.server.ReplicaFetcherThread.sendRequest(
> ReplicaFetcherThread.scala:239)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:229)
>         at kafka.server.ReplicaFetcherThread.fetch(
> ReplicaFetcherThread.scala:42)
>         at kafka.server.AbstractFetcherThread.processFetchRequest(
> AbstractFetcherThread.scala:107)
>         at kafka.server.AbstractFetcherThread.doWork(
> AbstractFetcherThread.scala:98)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
> [2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed
> fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager)
> [2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped
> (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown
> completed (kafka.server.ReplicaFetcherThread)
> [2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false)
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,034] INFO Result of znode creation is: OK
> (kafka.utils.ZKCheckedEphemeral)
> [2016-09-01 01:26:48,035] INFO 4 successfully elected as leader
> (kafka.server.ZookeeperLeaderElector)
> [2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.
> ZookeeperLeaderElector$LeaderChangeListener)
> [2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking
> ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition)
> [2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
> [2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]:
> Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.
> GroupMetadataManager)
>
>
> Why it's trying to connect the node3 of my local datacenter and it's
> throwing IOException.
>
> Thanks
> Achintya
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 10:26 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> Just to clarify, you did not take down either of the zookeepers in this
> test, right? Having only two zookeepers in the ensemble would mean that if
> either one of them failed, zookeeper wouldn't be able to reach quorum.
>
> I'm not entirely sure why this would happen. One possibility is that the
> consumer is failing to find the new coordinator, which might happen if all
> the replicas for one of the __consumer_offsets partitions were located in
> the "failed" datacenter. Perhaps you can enable DEBUG logging and post some
> logs so we can see what it's actually doing during poll().
>
> By the way, I noticed that your consumer configuration settings seem a
> little mixed up. The new consumer doesn't actually communicate with
> Zookeeper, so there's no need for those settings. And you don't need to
> include the "offsets.storage" option since Kafka is the only choice. Also,
> I don't think "consumer.timeout.ms" is an option.
>
> -Jason
>
>
> On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi Jason,
> >
> > Thanks for your response.
> >
> > I know that is a known issue and I resolved it calling wakeup method
> > by another thread. But here my problem is different, let me explain ,
> > it's very basic
> >
> > I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> > another(remote) datacenter and kept replication factor 6 with 2
> > zookeeper servers one from each datacenter ). Now I brought down all 3
> > nodes of my local datacenter and produced few messages and I see
> > producer is working fine even my local data center nodes are down. It
> > successfully writes the messages to other data center nodes. But when
> > I'm trying to consume the messages the consumer.poll method gets stuck
> > as my local datacenter is down though other datacenter's nodes are up.
> >
> > My question is as the data has been written successfully to other
> > datacenter why consumer part is not working?
> >
> > Here is my Producer settings:
> >
> > props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> > psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> > sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> > props.put("acks", "1");
> > props.put("max.block.ms", 1000);
> > props.put("key.serializer", "org.apache.kafka.common.serialization.
> > StringSerializer");
> > props.put("value.serializer", "com.comcast.ps.kafka.object.
> > CustomMessageSer");
> >
> > and here is Consumer settings:
> >
> > props.put("group.id", "app-consumer");
> >                 props.put("enable.auto.commit", "false");
> >                 props.put("auto.offset.reset", "earliest");
> >                 props.put("auto.commit.interval.ms", "500");
> >                 props.put("session.timeout.ms", "120000");
> >                 props.put("consumer.timeout.ms", "10000");
> >                 props.put("zookeeper.session.timeout.ms", "120000");
> >                 props.put("zookeeper.connection.timeout.ms", "60000");
> >                 props.put("offsets.storage","kafka");
> >                 props.put("request.timeout.ms", "150000");
> >                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net
> :
> > 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> > psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> > -ab.sys.comcast.net:61617");
> >                 props.put("key.deserializer", "org.apache.kafka.common.
> > serialization.StringDeserializer");
> >                 props.put("value.deserializer",
> > "com.comcast.ps.kafka.object.CustomMessageDeSer");
> >
> > Is it because of consumer is not able to get the broker metadata if it
> > is trying to connect other datacenter's zookeeper server? I tried with
> > to increate the zookeeper session timeout and connection time out but no
> luck.
> >
> > Please help on this.
> > Thanks
> > Achintya
> >
> >
> > -----Original Message-----
> > From: Jason Gustafson [mailto:jason@confluent.io]
> > Sent: Wednesday, August 31, 2016 4:05 PM
> > To: users@kafka.apache.org
> > Cc: dev@kafka.apache.org
> > Subject: Re: Kafka consumers unable to process message
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise
> > an exception in this case or do you just want to keep it from blocking
> > indefinitely? If the latter, you could escape the poll from another
> > thread using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> > > Hi there,
> > >
> > > Kafka consumer gets stuck at consumer.poll() method if my current
> > > datacenter is down and replicated messages are in remote datacenter.
> > >
> > > How to solve that issue?
> > >
> > > Thanks
> > > Achintya
> > >
> >
>

RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
Hi Jason,

No, I didn't bring down any zookeeper server. Even I tried with 3 zookeeper server one as an 'Observer' but the same issue.

Here is the server log from one of the node of my other datacenter:

[2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. (kafka.log.Log)
[2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting  (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
	at scala.Option.foreach(Option.scala:257)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
	at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped  (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown completed (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,034] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,035] INFO 4 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition)
[2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)


Why it's trying to connect the node3 of my local datacenter and it's throwing IOException.

Thanks
Achintya

-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 10:26 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this test, right? Having only two zookeepers in the ensemble would mean that if either one of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the consumer is failing to find the new coordinator, which might happen if all the replicas for one of the __consumer_offsets partitions were located in the "failed" datacenter. Perhaps you can enable DEBUG logging and post some logs so we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a little mixed up. The new consumer doesn't actually communicate with Zookeeper, so there's no need for those settings. And you don't need to include the "offsets.storage" option since Kafka is the only choice. Also, I don't think "consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method 
> by another thread. But here my problem is different, let me explain , 
> it's very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 
> zookeeper servers one from each datacenter ). Now I brought down all 3 
> nodes of my local datacenter and produced few messages and I see 
> producer is working fine even my local data center nodes are down. It 
> successfully writes the messages to other data center nodes. But when 
> I'm trying to consume the messages the consumer.poll method gets stuck 
> as my local datacenter is down though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other 
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, 
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
>                 props.put("enable.auto.commit", "false");
>                 props.put("auto.offset.reset", "earliest");
>                 props.put("auto.commit.interval.ms", "500");
>                 props.put("session.timeout.ms", "120000");
>                 props.put("consumer.timeout.ms", "10000");
>                 props.put("zookeeper.session.timeout.ms", "120000");
>                 props.put("zookeeper.connection.timeout.ms", "60000");
>                 props.put("offsets.storage","kafka");
>                 props.put("request.timeout.ms", "150000");
>                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
>                 props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>                 props.put("value.deserializer", 
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it 
> is trying to connect other datacenter's zookeeper server? I tried with 
> to increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise 
> an exception in this case or do you just want to keep it from blocking 
> indefinitely? If the latter, you could escape the poll from another 
> thread using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current 
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>

RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
Hi Jason,

No, I didn't bring down any zookeeper server. Even I tried with 3 zookeeper server one as an 'Observer' but the same issue.

Here is the server log from one of the node of my other datacenter:

[2016-09-01 01:25:19,221] INFO Truncating log TEST3-0 to offset 0. (kafka.log.Log)
[2016-09-01 01:25:19,257] INFO [ReplicaFetcherThread-0-3], Starting  (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:25:19,258] INFO [ReplicaFetcherManager on broker 4] Added fetcher for partitions List([[TEST3,0], initOffset 0 to broker BrokerEndPoint(3,psaq3-wc.sys.comcast.net,61616)] ) (kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:14,154] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@6618a925 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 3 was disconnected before the response was read
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:87)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1$$anonfun$apply$1.apply(NetworkClientBlockingOps.scala:84)
	at scala.Option.foreach(Option.scala:257)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:84)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingSendAndReceive$extension$1.apply(NetworkClientBlockingOps.scala:80)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$2(NetworkClientBlockingOps.scala:137)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollContinuously$extension(NetworkClientBlockingOps.scala:143)
	at kafka.utils.NetworkClientBlockingOps$.blockingSendAndReceive$extension(NetworkClientBlockingOps.scala:80)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:244)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:16,189] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@6e7e2578 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:18,198] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@5adea8fb (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:20,223] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@4c159cc3 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,246] WARN [ReplicaFetcherThread-0-3], Error in fetch kafka.server.ReplicaFetcherThread$FetchRequest@611ed1e9 (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to psaq3-wc.sys.comcast.net:61616 (id: 3 rack: null) failed
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:63)
	at kafka.utils.NetworkClientBlockingOps$$anonfun$blockingReady$extension$2.apply(NetworkClientBlockingOps.scala:59)
	at kafka.utils.NetworkClientBlockingOps$.recursivePoll$1(NetworkClientBlockingOps.scala:112)
	at kafka.utils.NetworkClientBlockingOps$.kafka$utils$NetworkClientBlockingOps$$pollUntil$extension(NetworkClientBlockingOps.scala:120)
	at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:59)
	at kafka.server.ReplicaFetcherThread.sendRequest(ReplicaFetcherThread.scala:239)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:229)
	at kafka.server.ReplicaFetcherThread.fetch(ReplicaFetcherThread.scala:42)
	at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:107)
	at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:98)
	at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)
[2016-09-01 01:26:22,736] INFO [ReplicaFetcherManager on broker 4] Removed fetcher for partitions [TEST3,0] (kafka.server.ReplicaFetcherManager)
[2016-09-01 01:26:22,752] INFO [ReplicaFetcherThread-0-3], Shutting down (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,755] INFO [ReplicaFetcherThread-0-3], Stopped  (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:22,756] INFO [ReplicaFetcherThread-0-3], Shutdown completed (kafka.server.ReplicaFetcherThread)
[2016-09-01 01:26:48,025] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,034] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2016-09-01 01:26:48,035] INFO 4 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2016-09-01 01:26:48,726] INFO New leader is 4 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2016-09-01 01:26:54,837] INFO Partition [TEST3,0] on broker 4: Shrinking ISR for partition [TEST3,0] from 4,5,6,1 to 4,5,6 (kafka.cluster.Partition)
[2016-09-01 01:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:43:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 01:53:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:03:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:13:04,928] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:23:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)
[2016-09-01 02:33:04,926] INFO [Group Metadata Manager on Broker 4]: Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.GroupMetadataManager)


Why it's trying to connect the node3 of my local datacenter and it's throwing IOException.

Thanks
Achintya

-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 10:26 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this test, right? Having only two zookeepers in the ensemble would mean that if either one of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the consumer is failing to find the new coordinator, which might happen if all the replicas for one of the __consumer_offsets partitions were located in the "failed" datacenter. Perhaps you can enable DEBUG logging and post some logs so we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a little mixed up. The new consumer doesn't actually communicate with Zookeeper, so there's no need for those settings. And you don't need to include the "offsets.storage" option since Kafka is the only choice. Also, I don't think "consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method 
> by another thread. But here my problem is different, let me explain , 
> it's very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 
> zookeeper servers one from each datacenter ). Now I brought down all 3 
> nodes of my local datacenter and produced few messages and I see 
> producer is working fine even my local data center nodes are down. It 
> successfully writes the messages to other data center nodes. But when 
> I'm trying to consume the messages the consumer.poll method gets stuck 
> as my local datacenter is down though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other 
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616, 
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
>                 props.put("enable.auto.commit", "false");
>                 props.put("auto.offset.reset", "earliest");
>                 props.put("auto.commit.interval.ms", "500");
>                 props.put("session.timeout.ms", "120000");
>                 props.put("consumer.timeout.ms", "10000");
>                 props.put("zookeeper.session.timeout.ms", "120000");
>                 props.put("zookeeper.connection.timeout.ms", "60000");
>                 props.put("offsets.storage","kafka");
>                 props.put("request.timeout.ms", "150000");
>                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
>                 props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>                 props.put("value.deserializer", 
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it 
> is trying to connect other datacenter's zookeeper server? I tried with 
> to increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise 
> an exception in this case or do you just want to keep it from blocking 
> indefinitely? If the latter, you could escape the poll from another 
> thread using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < 
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current 
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this
test, right? Having only two zookeepers in the ensemble would mean that if
either one of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the
consumer is failing to find the new coordinator, which might happen if all
the replicas for one of the __consumer_offsets partitions were located in
the "failed" datacenter. Perhaps you can enable DEBUG logging and post some
logs so we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a
little mixed up. The new consumer doesn't actually communicate with
Zookeeper, so there's no need for those settings. And you don't need to
include the "offsets.storage" option since Kafka is the only choice. Also,
I don't think "consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method by
> another thread. But here my problem is different, let me explain , it's
> very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 zookeeper
> servers one from each datacenter ). Now I brought down all 3 nodes of my
> local datacenter and produced few messages and I see producer is working
> fine even my local data center nodes are down. It successfully writes the
> messages to other data center nodes. But when I'm trying to consume the
> messages the consumer.poll method gets stuck as my local datacenter is down
> though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
>                 props.put("enable.auto.commit", "false");
>                 props.put("auto.offset.reset", "earliest");
>                 props.put("auto.commit.interval.ms", "500");
>                 props.put("session.timeout.ms", "120000");
>                 props.put("consumer.timeout.ms", "10000");
>                 props.put("zookeeper.session.timeout.ms", "120000");
>                 props.put("zookeeper.connection.timeout.ms", "60000");
>                 props.put("offsets.storage","kafka");
>                 props.put("request.timeout.ms", "150000");
>                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
>                 props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>                 props.put("value.deserializer",
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it is
> trying to connect other datacenter's zookeeper server? I tried with to
> increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Achintya,

Just to clarify, you did not take down either of the zookeepers in this
test, right? Having only two zookeepers in the ensemble would mean that if
either one of them failed, zookeeper wouldn't be able to reach quorum.

I'm not entirely sure why this would happen. One possibility is that the
consumer is failing to find the new coordinator, which might happen if all
the replicas for one of the __consumer_offsets partitions were located in
the "failed" datacenter. Perhaps you can enable DEBUG logging and post some
logs so we can see what it's actually doing during poll().

By the way, I noticed that your consumer configuration settings seem a
little mixed up. The new consumer doesn't actually communicate with
Zookeeper, so there's no need for those settings. And you don't need to
include the "offsets.storage" option since Kafka is the only choice. Also,
I don't think "consumer.timeout.ms" is an option.

-Jason


On Wed, Aug 31, 2016 at 6:43 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi Jason,
>
> Thanks for your response.
>
> I know that is a known issue and I resolved it calling wakeup method by
> another thread. But here my problem is different, let me explain , it's
> very basic
>
> I created one cluster with 6 nodes( 3 from one datacenter and 3 from
> another(remote) datacenter and kept replication factor 6 with 2 zookeeper
> servers one from each datacenter ). Now I brought down all 3 nodes of my
> local datacenter and produced few messages and I see producer is working
> fine even my local data center nodes are down. It successfully writes the
> messages to other data center nodes. But when I'm trying to consume the
> messages the consumer.poll method gets stuck as my local datacenter is down
> though other datacenter's nodes are up.
>
> My question is as the data has been written successfully to other
> datacenter why consumer part is not working?
>
> Here is my Producer settings:
>
> props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,
> psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.
> sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
> props.put("acks", "1");
> props.put("max.block.ms", 1000);
> props.put("key.serializer", "org.apache.kafka.common.serialization.
> StringSerializer");
> props.put("value.serializer", "com.comcast.ps.kafka.object.
> CustomMessageSer");
>
> and here is Consumer settings:
>
> props.put("group.id", "app-consumer");
>                 props.put("enable.auto.commit", "false");
>                 props.put("auto.offset.reset", "earliest");
>                 props.put("auto.commit.interval.ms", "500");
>                 props.put("session.timeout.ms", "120000");
>                 props.put("consumer.timeout.ms", "10000");
>                 props.put("zookeeper.session.timeout.ms", "120000");
>                 props.put("zookeeper.connection.timeout.ms", "60000");
>                 props.put("offsets.storage","kafka");
>                 props.put("request.timeout.ms", "150000");
>                 props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:
> 61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,
> psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3
> -ab.sys.comcast.net:61617");
>                 props.put("key.deserializer", "org.apache.kafka.common.
> serialization.StringDeserializer");
>                 props.put("value.deserializer",
> "com.comcast.ps.kafka.object.CustomMessageDeSer");
>
> Is it because of consumer is not able to get the broker metadata if it is
> trying to connect other datacenter's zookeeper server? I tried with to
> increate the zookeeper session timeout and connection time out but no luck.
>
> Please help on this.
> Thanks
> Achintya
>
>
> -----Original Message-----
> From: Jason Gustafson [mailto:jason@confluent.io]
> Sent: Wednesday, August 31, 2016 4:05 PM
> To: users@kafka.apache.org
> Cc: dev@kafka.apache.org
> Subject: Re: Kafka consumers unable to process message
>
> Hi Achintya,
>
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
>
> Thanks,
> Jason
>
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
>
> > Hi there,
> >
> > Kafka consumer gets stuck at consumer.poll() method if my current
> > datacenter is down and replicated messages are in remote datacenter.
> >
> > How to solve that issue?
> >
> > Thanks
> > Achintya
> >
>

RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
Hi Jason,

Thanks for your response.

I know that is a known issue and I resolved it calling wakeup method by another thread. But here my problem is different, let me explain , it's very basic

I created one cluster with 6 nodes( 3 from one datacenter and 3 from another(remote) datacenter and kept replication factor 6 with 2 zookeeper servers one from each datacenter ). Now I brought down all 3 nodes of my local datacenter and produced few messages and I see producer is working fine even my local data center nodes are down. It successfully writes the messages to other data center nodes. But when I'm trying to consume the messages the consumer.poll method gets stuck as my local datacenter is down though other datacenter's nodes are up. 

My question is as the data has been written successfully to other datacenter why consumer part is not working?

Here is my Producer settings:

props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");
props.put("acks", "1");
props.put("max.block.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.comcast.ps.kafka.object.CustomMessageSer");

and here is Consumer settings:

props.put("group.id", "app-consumer");
		props.put("enable.auto.commit", "false");
		props.put("auto.offset.reset", "earliest");
		props.put("auto.commit.interval.ms", "500");
		props.put("session.timeout.ms", "120000");
		props.put("consumer.timeout.ms", "10000");
		props.put("zookeeper.session.timeout.ms", "120000");
		props.put("zookeeper.connection.timeout.ms", "60000");
		props.put("offsets.storage","kafka");
		props.put("request.timeout.ms", "150000");
		props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");		
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "com.comcast.ps.kafka.object.CustomMessageDeSer");

Is it because of consumer is not able to get the broker metadata if it is trying to connect other datacenter's zookeeper server? I tried with to increate the zookeeper session timeout and connection time out but no luck.

Please help on this.
Thanks
Achintya


-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 4:05 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an exception in this case or do you just want to keep it from blocking indefinitely? If the latter, you could escape the poll from another thread using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current 
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Kafka clients have tended to make broker retries transparent to the user.
There's been discussion on various JIRAs about what we should do when all
the known brokers become unreachable. One option is to revert to the
configured bootstrap broker list, which is nice if you've configured a vip
for bootstrapping. More generally, we've discussed introducing a pluggable
interface for broker discovery, which allows for integration with service
discovery frameworks like consul. I'm supportive of this option, but we
probably need a champion with a little more time to investigate the options
and push it through. For the JIRA that I linked to above, I'm inclined to
have poll() silently retry since that is consistent with current behavior,
but it should not block longer than the passed timeout.

-Jason

On Wed, Aug 31, 2016 at 3:00 PM, Jim Jagielski <ji...@jagunet.com> wrote:

> Yeah, let's figure out the "best" action to take...
>
> Looks like something I'd like to get a handle on.
>
> > On Aug 31, 2016, at 4:05 PM, Jason Gustafson <ja...@confluent.io> wrote:
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> > exception in this case or do you just want to keep it from blocking
> > indefinitely? If the latter, you could escape the poll from another
> thread
> > using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> >> Hi there,
> >>
> >> Kafka consumer gets stuck at consumer.poll() method if my current
> >> datacenter is down and replicated messages are in remote datacenter.
> >>
> >> How to solve that issue?
> >>
> >> Thanks
> >> Achintya
> >>
>
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Kafka clients have tended to make broker retries transparent to the user.
There's been discussion on various JIRAs about what we should do when all
the known brokers become unreachable. One option is to revert to the
configured bootstrap broker list, which is nice if you've configured a vip
for bootstrapping. More generally, we've discussed introducing a pluggable
interface for broker discovery, which allows for integration with service
discovery frameworks like consul. I'm supportive of this option, but we
probably need a champion with a little more time to investigate the options
and push it through. For the JIRA that I linked to above, I'm inclined to
have poll() silently retry since that is consistent with current behavior,
but it should not block longer than the passed timeout.

-Jason

On Wed, Aug 31, 2016 at 3:00 PM, Jim Jagielski <ji...@jagunet.com> wrote:

> Yeah, let's figure out the "best" action to take...
>
> Looks like something I'd like to get a handle on.
>
> > On Aug 31, 2016, at 4:05 PM, Jason Gustafson <ja...@confluent.io> wrote:
> >
> > Hi Achintya,
> >
> > We have a JIRA for this problem: https://issues.
> > apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> > exception in this case or do you just want to keep it from blocking
> > indefinitely? If the latter, you could escape the poll from another
> thread
> > using wakeup().
> >
> > Thanks,
> > Jason
> >
> > On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> > Achintya_Ghosh@comcast.com> wrote:
> >
> >> Hi there,
> >>
> >> Kafka consumer gets stuck at consumer.poll() method if my current
> >> datacenter is down and replicated messages are in remote datacenter.
> >>
> >> How to solve that issue?
> >>
> >> Thanks
> >> Achintya
> >>
>
>

Re: Kafka consumers unable to process message

Posted by Jim Jagielski <ji...@jaguNET.com>.
Yeah, let's figure out the "best" action to take...

Looks like something I'd like to get a handle on.

> On Aug 31, 2016, at 4:05 PM, Jason Gustafson <ja...@confluent.io> wrote:
> 
> Hi Achintya,
> 
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
> 
> Thanks,
> Jason
> 
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
> 
>> Hi there,
>> 
>> Kafka consumer gets stuck at consumer.poll() method if my current
>> datacenter is down and replicated messages are in remote datacenter.
>> 
>> How to solve that issue?
>> 
>> Thanks
>> Achintya
>> 


RE: Kafka consumers unable to process message

Posted by "Ghosh, Achintya (Contractor)" <Ac...@comcast.com>.
Hi Jason,

Thanks for your response.

I know that is a known issue and I resolved it calling wakeup method by another thread. But here my problem is different, let me explain , it's very basic

I created one cluster with 6 nodes( 3 from one datacenter and 3 from another(remote) datacenter and kept replication factor 6 with 2 zookeeper servers one from each datacenter ). Now I brought down all 3 nodes of my local datacenter and produced few messages and I see producer is working fine even my local data center nodes are down. It successfully writes the messages to other data center nodes. But when I'm trying to consume the messages the consumer.poll method gets stuck as my local datacenter is down though other datacenter's nodes are up. 

My question is as the data has been written successfully to other datacenter why consumer part is not working?

Here is my Producer settings:

props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");
props.put("acks", "1");
props.put("max.block.ms", 1000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "com.comcast.ps.kafka.object.CustomMessageSer");

and here is Consumer settings:

props.put("group.id", "app-consumer");
		props.put("enable.auto.commit", "false");
		props.put("auto.offset.reset", "earliest");
		props.put("auto.commit.interval.ms", "500");
		props.put("session.timeout.ms", "120000");
		props.put("consumer.timeout.ms", "10000");
		props.put("zookeeper.session.timeout.ms", "120000");
		props.put("zookeeper.connection.timeout.ms", "60000");
		props.put("offsets.storage","kafka");
		props.put("request.timeout.ms", "150000");
		props.put("bootstrap.servers", "psaq1-wc.sys.comcast.net:61616,psaq2-wc.sys.comcast.net:61616,psaq3-wc.sys.comcast.net:61616,psaq1-ab.sys.comcast.net:61617,psaq2-ab.sys.comcast.net:61617,psaq3-ab.sys.comcast.net:61617");		
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "com.comcast.ps.kafka.object.CustomMessageDeSer");

Is it because of consumer is not able to get the broker metadata if it is trying to connect other datacenter's zookeeper server? I tried with to increate the zookeeper session timeout and connection time out but no luck.

Please help on this.
Thanks
Achintya


-----Original Message-----
From: Jason Gustafson [mailto:jason@confluent.io] 
Sent: Wednesday, August 31, 2016 4:05 PM
To: users@kafka.apache.org
Cc: dev@kafka.apache.org
Subject: Re: Kafka consumers unable to process message

Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an exception in this case or do you just want to keep it from blocking indefinitely? If the latter, you could escape the poll from another thread using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) < Achintya_Ghosh@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current 
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>

Re: Kafka consumers unable to process message

Posted by Jim Jagielski <ji...@jaguNET.com>.
Yeah, let's figure out the "best" action to take...

Looks like something I'd like to get a handle on.

> On Aug 31, 2016, at 4:05 PM, Jason Gustafson <ja...@confluent.io> wrote:
> 
> Hi Achintya,
> 
> We have a JIRA for this problem: https://issues.
> apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
> exception in this case or do you just want to keep it from blocking
> indefinitely? If the latter, you could escape the poll from another thread
> using wakeup().
> 
> Thanks,
> Jason
> 
> On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
> Achintya_Ghosh@comcast.com> wrote:
> 
>> Hi there,
>> 
>> Kafka consumer gets stuck at consumer.poll() method if my current
>> datacenter is down and replicated messages are in remote datacenter.
>> 
>> How to solve that issue?
>> 
>> Thanks
>> Achintya
>> 


Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
exception in this case or do you just want to keep it from blocking
indefinitely? If the latter, you could escape the poll from another thread
using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>

Re: Kafka consumers unable to process message

Posted by Jason Gustafson <ja...@confluent.io>.
Hi Achintya,

We have a JIRA for this problem: https://issues.
apache.org/jira/browse/KAFKA-3834. Do you expect the client to raise an
exception in this case or do you just want to keep it from blocking
indefinitely? If the latter, you could escape the poll from another thread
using wakeup().

Thanks,
Jason

On Wed, Aug 31, 2016 at 12:11 PM, Ghosh, Achintya (Contractor) <
Achintya_Ghosh@comcast.com> wrote:

> Hi there,
>
> Kafka consumer gets stuck at consumer.poll() method if my current
> datacenter is down and replicated messages are in remote datacenter.
>
> How to solve that issue?
>
> Thanks
> Achintya
>