You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shashi Vishwakarma <sh...@gmail.com> on 2016/03/03 19:45:02 UTC

Kafka | Unable to publish data to broker - ClosedChannelException

Hi

I am trying to run simple kafka producer consumer example on HDP but facing
below exception.

[2016-03-03 18:26:38,683] WARN Fetching topic metadata with
correlation id 0 for topics [Set(page_visits)] from broker
[BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
(kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2016-03-03 18:26:38,688] ERROR fetching topic metadata for topics
[Set(page_visits)] from broker
[ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
(kafka.utils.CoreUtils$)
kafka.common.KafkaException: fetching topic metadata for topics
[Set(page_visits)] from broker
[ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
Caused by: java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        ... 12 more
[2016-03-03 18:26:38,693] WARN Fetching topic metadata with
correlation id 1 for topics [Set(page_visits)] from broker
[BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
(kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException

Here is command that I am using for producer.

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092
 --topic page_visits

After doing bit of googling , I found that I need to add
advertised.host.name property in server.properties file . Here is my
server.properties file.

# Generated by Apache Ambari. Thu Mar  3 18:12:50 2016
advertised.host.name=sandbox.hortonworks.com
auto.create.topics.enable=true
auto.leader.rebalance.enable=truebroker.id=0
compression.type=producer
controlled.shutdown.enable=true
controlled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms=5000
controller.message.queue.size=10controller.socket.timeout.ms=30000
default.replication.factor=1
delete.topic.enable=false
fetch.purgatory.purge.interval.requests=10000host.name=sandbox.hortonworks.com
kafka.ganglia.metrics.group=kafka
kafka.ganglia.metrics.host=localhost
kafka.ganglia.metrics.port=8671
kafka.ganglia.metrics.reporter.enabled=true
kafka.metrics.reporters=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter
kafka.timeline.metrics.host=sandbox.hortonworks.com
kafka.timeline.metrics.maxRowCacheSize=10000
kafka.timeline.metrics.port=6188
kafka.timeline.metrics.reporter.enabled=true
kafka.timeline.metrics.reporter.sendInterval=5900
leader.imbalance.check.interval.seconds=300
leader.imbalance.per.broker.percentage=10
listeners=PLAINTEXT://sandbox.hortonworks.com:6667
log.cleanup.interval.mins=10
log.dirs=/kafka-logs
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.bytes=-1
log.retention.hours=168
log.roll.hours=168
log.segment.bytes=1073741824
message.max.bytes=1000000
min.insync.replicas=1
num.io.threads=8
num.network.threads=3
num.partitions=1
num.recovery.threads.per.data.dir=1
num.replica.fetchers=1
offset.metadata.max.bytes=4096
offsets.commit.required.acks=-1offsets.commit.timeout.ms=5000
offsets.load.buffer.size=5242880offsets.retention.check.interval.ms=600000
offsets.retention.minutes=86400000
offsets.topic.compression.codec=0
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.topic.segment.bytes=104857600
producer.purgatory.purge.interval.requests=10000
queued.max.requests=500
replica.fetch.max.bytes=1048576
replica.fetch.min.bytes=1replica.fetch.wait.max.ms=500replica.high.watermark.checkpoint.interval.ms=5000
replica.lag.max.messages=4000replica.lag.time.max.ms=10000
replica.socket.receive.buffer.bytes=65536replica.socket.timeout.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
zookeeper.connect=sandbox.hortonworks.com:2181zookeeper.connection.timeout.ms=15000zookeeper.session.timeout.ms=30000zookeeper.sync.time.ms=2000

After adding property i am getting same exception.

Any suggestion.

Thanks

Shashi

Re: Kafka | Unable to publish data to broker - ClosedChannelException

Posted by Banias H <ba...@gmail.com>.
It sounds like an issue with the topic. You can try describe the page_vists
topic like:

./kafka-topics.sh --zookeeper localhost:2181 --describe

and make sure it has a leader. If not, I would suggest creating a new topic
and try if you can send messages to it.


On Fri, Mar 4, 2016 at 3:34 AM, Shashi Vishwakarma <shashi.vish123@gmail.com
> wrote:

> Hi
>
> I changed command to
>
> ./kafka-console-producer.sh --broker-list  sandbox.hortonworks.com:6667
> --topic page_visits
>
> Exception changed but no success.
>
>
> [2016-03-04 09:26:12,742] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,754] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,755] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: page_visits
> (kafka.producer.async.DefaultEventHandler)
> [2016-03-04 09:26:12,865] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,873] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,873] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: page_visits
> (kafka.producer.async.DefaultEventHandler)
> [2016-03-04 09:26:12,979] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,985] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:12,985] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: page_visits
> (kafka.producer.async.DefaultEventHandler)
> [2016-03-04 09:26:13,095] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:13,107] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:13,107] ERROR Failed to collate messages by topic,
> partition due to: Failed to fetch topic metadata for topic: page_visits
> (kafka.producer.async.DefaultEventHandler)
> [2016-03-04 09:26:13,215] WARN Error while fetching metadata
> [{TopicMetadata for topic page_visits ->
> No partition metadata for topic page_visits due to
> kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
> kafka.common.LeaderNotAvailableException
>  (kafka.producer.BrokerPartitionInfo)
> [2016-03-04 09:26:13,217] ERROR Failed to send requests for topics
> page_visits with correlation ids in [0,8]
> (kafka.producer.async.DefaultEventHandler)
> [2016-03-04 09:26:13,223] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
>
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
>         at
>
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
>
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
>
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>
>
>
> Can you point where i am going wrong?
>
> Thanks
> Shashi
>
> On Fri, Mar 4, 2016 at 2:48 AM, Banias H <ba...@gmail.com> wrote:
>
> > Try changing the port like below.
> >
> > ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:
> > <http://sandbox.hortonworks.com:9092/>*6667* --topic page_visits
> >
> > -B
> >
> > On Thu, Mar 3, 2016 at 12:45 PM, Shashi Vishwakarma <
> > shashi.vish123@gmail.com> wrote:
> >
> > > Hi
> > >
> > > I am trying to run simple kafka producer consumer example on HDP but
> > facing
> > > below exception.
> > >
> > > [2016-03-03 18:26:38,683] WARN Fetching topic metadata with
> > > correlation id 0 for topics [Set(page_visits)] from broker
> > > [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> > > (kafka.client.ClientUtils$)
> > > java.nio.channels.ClosedChannelException
> > >         at
> kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
> > >         at
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > >         at
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
> > >         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
> > >         at
> > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> > >         at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > >         at
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> > >         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
> > >         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > >         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> > >         at
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > >         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >         at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > > [2016-03-03 18:26:38,688] ERROR fetching topic metadata for topics
> > > [Set(page_visits)] from broker
> > > [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
> > > (kafka.utils.CoreUtils$)
> > > kafka.common.KafkaException: fetching topic metadata for topics
> > > [Set(page_visits)] from broker
> > > [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
> > >         at
> > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
> > >         at
> > >
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> > >         at
> > >
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> > >         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
> > >         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> > >         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> > >         at
> > >
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> > >         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> > >         at
> > >
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> > >         at
> > >
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > > Caused by: java.nio.channels.ClosedChannelException
> > >         at
> kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
> > >         at
> > kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> > >         at
> > >
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
> > >         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
> > >         at
> > > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> > >         ... 12 more
> > > [2016-03-03 18:26:38,693] WARN Fetching topic metadata with
> > > correlation id 1 for topics [Set(page_visits)] from broker
> > > [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> > > (kafka.client.ClientUtils$)
> > > java.nio.channels.ClosedChannelException
> > >
> > > Here is command that I am using for producer.
> > >
> > > ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092
> > >  --topic page_visits
> > >
> > > After doing bit of googling , I found that I need to add
> > > advertised.host.name property in server.properties file . Here is my
> > > server.properties file.
> > >
> > > # Generated by Apache Ambari. Thu Mar  3 18:12:50 2016
> > > advertised.host.name=sandbox.hortonworks.com
> > > auto.create.topics.enable=true
> > > auto.leader.rebalance.enable=truebroker.id=0
> > > compression.type=producer
> > > controlled.shutdown.enable=true
> > > controlled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms
> > =5000
> > > controller.message.queue.size=10controller.socket.timeout.ms=30000
> > > default.replication.factor=1
> > > delete.topic.enable=false
> > > fetch.purgatory.purge.interval.requests=10000host.name=
> > > sandbox.hortonworks.com
> > > kafka.ganglia.metrics.group=kafka
> > > kafka.ganglia.metrics.host=localhost
> > > kafka.ganglia.metrics.port=8671
> > > kafka.ganglia.metrics.reporter.enabled=true
> > >
> > >
> >
> kafka.metrics.reporters=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter
> > > kafka.timeline.metrics.host=sandbox.hortonworks.com
> > > kafka.timeline.metrics.maxRowCacheSize=10000
> > > kafka.timeline.metrics.port=6188
> > > kafka.timeline.metrics.reporter.enabled=true
> > > kafka.timeline.metrics.reporter.sendInterval=5900
> > > leader.imbalance.check.interval.seconds=300
> > > leader.imbalance.per.broker.percentage=10
> > > listeners=PLAINTEXT://sandbox.hortonworks.com:6667
> > > log.cleanup.interval.mins=10
> > > log.dirs=/kafka-logs
> > > log.index.interval.bytes=4096
> > > log.index.size.max.bytes=10485760
> > > log.retention.bytes=-1
> > > log.retention.hours=168
> > > log.roll.hours=168
> > > log.segment.bytes=1073741824
> > > message.max.bytes=1000000
> > > min.insync.replicas=1
> > > num.io.threads=8
> > > num.network.threads=3
> > > num.partitions=1
> > > num.recovery.threads.per.data.dir=1
> > > num.replica.fetchers=1
> > > offset.metadata.max.bytes=4096
> > > offsets.commit.required.acks=-1offsets.commit.timeout.ms=5000
> > > offsets.load.buffer.size=5242880offsets.retention.check.interval.ms
> > =600000
> > > offsets.retention.minutes=86400000
> > > offsets.topic.compression.codec=0
> > > offsets.topic.num.partitions=50
> > > offsets.topic.replication.factor=3
> > > offsets.topic.segment.bytes=104857600
> > > producer.purgatory.purge.interval.requests=10000
> > > queued.max.requests=500
> > > replica.fetch.max.bytes=1048576
> > > replica.fetch.min.bytes=1replica.fetch.wait.max.ms=
> > > 500replica.high.watermark.checkpoint.interval.ms=5000
> > > replica.lag.max.messages=4000replica.lag.time.max.ms=10000
> > > replica.socket.receive.buffer.bytes=65536replica.socket.timeout.ms
> =30000
> > > socket.receive.buffer.bytes=102400
> > > socket.request.max.bytes=104857600
> > > socket.send.buffer.bytes=102400
> > > zookeeper.connect=sandbox.hortonworks.com:2181z
> > > ookeeper.connection.timeout.ms=15000zookeeper.session.timeout.ms=
> > > 30000zookeeper.sync.time.ms=2000
> > >
> > > After adding property i am getting same exception.
> > >
> > > Any suggestion.
> > >
> > > Thanks
> > >
> > > Shashi
> > >
> >
>

Re: Kafka | Unable to publish data to broker - ClosedChannelException

Posted by Shashi Vishwakarma <sh...@gmail.com>.
Hi

I changed command to

./kafka-console-producer.sh --broker-list  sandbox.hortonworks.com:6667
--topic page_visits

Exception changed but no success.


[2016-03-04 09:26:12,742] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,754] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,755] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: page_visits
(kafka.producer.async.DefaultEventHandler)
[2016-03-04 09:26:12,865] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,873] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,873] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: page_visits
(kafka.producer.async.DefaultEventHandler)
[2016-03-04 09:26:12,979] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,985] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:12,985] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: page_visits
(kafka.producer.async.DefaultEventHandler)
[2016-03-04 09:26:13,095] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:13,107] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:13,107] ERROR Failed to collate messages by topic,
partition due to: Failed to fetch topic metadata for topic: page_visits
(kafka.producer.async.DefaultEventHandler)
[2016-03-04 09:26:13,215] WARN Error while fetching metadata
[{TopicMetadata for topic page_visits ->
No partition metadata for topic page_visits due to
kafka.common.LeaderNotAvailableException}] for topic [page_visits]: class
kafka.common.LeaderNotAvailableException
 (kafka.producer.BrokerPartitionInfo)
[2016-03-04 09:26:13,217] ERROR Failed to send requests for topics
page_visits with correlation ids in [0,8]
(kafka.producer.async.DefaultEventHandler)
[2016-03-04 09:26:13,223] ERROR Error in handling batch of 1 events
(kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3
tries.
        at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:91)
        at
kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at
kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at
kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at
kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)



Can you point where i am going wrong?

Thanks
Shashi

On Fri, Mar 4, 2016 at 2:48 AM, Banias H <ba...@gmail.com> wrote:

> Try changing the port like below.
>
> ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:
> <http://sandbox.hortonworks.com:9092/>*6667* --topic page_visits
>
> -B
>
> On Thu, Mar 3, 2016 at 12:45 PM, Shashi Vishwakarma <
> shashi.vish123@gmail.com> wrote:
>
> > Hi
> >
> > I am trying to run simple kafka producer consumer example on HDP but
> facing
> > below exception.
> >
> > [2016-03-03 18:26:38,683] WARN Fetching topic metadata with
> > correlation id 0 for topics [Set(page_visits)] from broker
> > [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> > (kafka.client.ClientUtils$)
> > java.nio.channels.ClosedChannelException
> >         at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
> >         at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> >         at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> >         at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> >         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
> >         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> >         at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >         at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >         at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > [2016-03-03 18:26:38,688] ERROR fetching topic metadata for topics
> > [Set(page_visits)] from broker
> > [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
> > (kafka.utils.CoreUtils$)
> > kafka.common.KafkaException: fetching topic metadata for topics
> > [Set(page_visits)] from broker
> > [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
> >         at
> >
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> >         at
> >
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
> >         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
> >         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> >         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
> >         at
> >
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
> >         at
> >
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
> >         at
> >
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
> >         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
> >         at
> >
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
> >         at
> > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> > Caused by: java.nio.channels.ClosedChannelException
> >         at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
> >         at
> kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
> >         at
> >
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
> >         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
> >         at
> > kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
> >         ... 12 more
> > [2016-03-03 18:26:38,693] WARN Fetching topic metadata with
> > correlation id 1 for topics [Set(page_visits)] from broker
> > [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> > (kafka.client.ClientUtils$)
> > java.nio.channels.ClosedChannelException
> >
> > Here is command that I am using for producer.
> >
> > ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092
> >  --topic page_visits
> >
> > After doing bit of googling , I found that I need to add
> > advertised.host.name property in server.properties file . Here is my
> > server.properties file.
> >
> > # Generated by Apache Ambari. Thu Mar  3 18:12:50 2016
> > advertised.host.name=sandbox.hortonworks.com
> > auto.create.topics.enable=true
> > auto.leader.rebalance.enable=truebroker.id=0
> > compression.type=producer
> > controlled.shutdown.enable=true
> > controlled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms
> =5000
> > controller.message.queue.size=10controller.socket.timeout.ms=30000
> > default.replication.factor=1
> > delete.topic.enable=false
> > fetch.purgatory.purge.interval.requests=10000host.name=
> > sandbox.hortonworks.com
> > kafka.ganglia.metrics.group=kafka
> > kafka.ganglia.metrics.host=localhost
> > kafka.ganglia.metrics.port=8671
> > kafka.ganglia.metrics.reporter.enabled=true
> >
> >
> kafka.metrics.reporters=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter
> > kafka.timeline.metrics.host=sandbox.hortonworks.com
> > kafka.timeline.metrics.maxRowCacheSize=10000
> > kafka.timeline.metrics.port=6188
> > kafka.timeline.metrics.reporter.enabled=true
> > kafka.timeline.metrics.reporter.sendInterval=5900
> > leader.imbalance.check.interval.seconds=300
> > leader.imbalance.per.broker.percentage=10
> > listeners=PLAINTEXT://sandbox.hortonworks.com:6667
> > log.cleanup.interval.mins=10
> > log.dirs=/kafka-logs
> > log.index.interval.bytes=4096
> > log.index.size.max.bytes=10485760
> > log.retention.bytes=-1
> > log.retention.hours=168
> > log.roll.hours=168
> > log.segment.bytes=1073741824
> > message.max.bytes=1000000
> > min.insync.replicas=1
> > num.io.threads=8
> > num.network.threads=3
> > num.partitions=1
> > num.recovery.threads.per.data.dir=1
> > num.replica.fetchers=1
> > offset.metadata.max.bytes=4096
> > offsets.commit.required.acks=-1offsets.commit.timeout.ms=5000
> > offsets.load.buffer.size=5242880offsets.retention.check.interval.ms
> =600000
> > offsets.retention.minutes=86400000
> > offsets.topic.compression.codec=0
> > offsets.topic.num.partitions=50
> > offsets.topic.replication.factor=3
> > offsets.topic.segment.bytes=104857600
> > producer.purgatory.purge.interval.requests=10000
> > queued.max.requests=500
> > replica.fetch.max.bytes=1048576
> > replica.fetch.min.bytes=1replica.fetch.wait.max.ms=
> > 500replica.high.watermark.checkpoint.interval.ms=5000
> > replica.lag.max.messages=4000replica.lag.time.max.ms=10000
> > replica.socket.receive.buffer.bytes=65536replica.socket.timeout.ms=30000
> > socket.receive.buffer.bytes=102400
> > socket.request.max.bytes=104857600
> > socket.send.buffer.bytes=102400
> > zookeeper.connect=sandbox.hortonworks.com:2181z
> > ookeeper.connection.timeout.ms=15000zookeeper.session.timeout.ms=
> > 30000zookeeper.sync.time.ms=2000
> >
> > After adding property i am getting same exception.
> >
> > Any suggestion.
> >
> > Thanks
> >
> > Shashi
> >
>

Re: Kafka | Unable to publish data to broker - ClosedChannelException

Posted by Banias H <ba...@gmail.com>.
Try changing the port like below.

./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:
<http://sandbox.hortonworks.com:9092/>*6667* --topic page_visits

-B

On Thu, Mar 3, 2016 at 12:45 PM, Shashi Vishwakarma <
shashi.vish123@gmail.com> wrote:

> Hi
>
> I am trying to run simple kafka producer consumer example on HDP but facing
> below exception.
>
> [2016-03-03 18:26:38,683] WARN Fetching topic metadata with
> correlation id 0 for topics [Set(page_visits)] from broker
> [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
>         at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
>         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2016-03-03 18:26:38,688] ERROR fetching topic metadata for topics
> [Set(page_visits)] from broker
> [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
> (kafka.utils.CoreUtils$)
> kafka.common.KafkaException: fetching topic metadata for topics
> [Set(page_visits)] from broker
> [ArrayBuffer(BrokerEndPoint(0,sandbox.hortonworks.com,9092))] failed
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:73)
>         at
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:68)
>         at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:89)
>         at kafka.utils.Logging$class.swallowError(Logging.scala:106)
>         at kafka.utils.CoreUtils$.swallowError(CoreUtils.scala:51)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:68)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> Caused by: java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:120)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:75)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:115)
>         at
> kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
>         ... 12 more
> [2016-03-03 18:26:38,693] WARN Fetching topic metadata with
> correlation id 1 for topics [Set(page_visits)] from broker
> [BrokerEndPoint(0,sandbox.hortonworks.com,9092)] failed
> (kafka.client.ClientUtils$)
> java.nio.channels.ClosedChannelException
>
> Here is command that I am using for producer.
>
> ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:9092
>  --topic page_visits
>
> After doing bit of googling , I found that I need to add
> advertised.host.name property in server.properties file . Here is my
> server.properties file.
>
> # Generated by Apache Ambari. Thu Mar  3 18:12:50 2016
> advertised.host.name=sandbox.hortonworks.com
> auto.create.topics.enable=true
> auto.leader.rebalance.enable=truebroker.id=0
> compression.type=producer
> controlled.shutdown.enable=true
> controlled.shutdown.max.retries=3controlled.shutdown.retry.backoff.ms=5000
> controller.message.queue.size=10controller.socket.timeout.ms=30000
> default.replication.factor=1
> delete.topic.enable=false
> fetch.purgatory.purge.interval.requests=10000host.name=
> sandbox.hortonworks.com
> kafka.ganglia.metrics.group=kafka
> kafka.ganglia.metrics.host=localhost
> kafka.ganglia.metrics.port=8671
> kafka.ganglia.metrics.reporter.enabled=true
>
> kafka.metrics.reporters=org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter
> kafka.timeline.metrics.host=sandbox.hortonworks.com
> kafka.timeline.metrics.maxRowCacheSize=10000
> kafka.timeline.metrics.port=6188
> kafka.timeline.metrics.reporter.enabled=true
> kafka.timeline.metrics.reporter.sendInterval=5900
> leader.imbalance.check.interval.seconds=300
> leader.imbalance.per.broker.percentage=10
> listeners=PLAINTEXT://sandbox.hortonworks.com:6667
> log.cleanup.interval.mins=10
> log.dirs=/kafka-logs
> log.index.interval.bytes=4096
> log.index.size.max.bytes=10485760
> log.retention.bytes=-1
> log.retention.hours=168
> log.roll.hours=168
> log.segment.bytes=1073741824
> message.max.bytes=1000000
> min.insync.replicas=1
> num.io.threads=8
> num.network.threads=3
> num.partitions=1
> num.recovery.threads.per.data.dir=1
> num.replica.fetchers=1
> offset.metadata.max.bytes=4096
> offsets.commit.required.acks=-1offsets.commit.timeout.ms=5000
> offsets.load.buffer.size=5242880offsets.retention.check.interval.ms=600000
> offsets.retention.minutes=86400000
> offsets.topic.compression.codec=0
> offsets.topic.num.partitions=50
> offsets.topic.replication.factor=3
> offsets.topic.segment.bytes=104857600
> producer.purgatory.purge.interval.requests=10000
> queued.max.requests=500
> replica.fetch.max.bytes=1048576
> replica.fetch.min.bytes=1replica.fetch.wait.max.ms=
> 500replica.high.watermark.checkpoint.interval.ms=5000
> replica.lag.max.messages=4000replica.lag.time.max.ms=10000
> replica.socket.receive.buffer.bytes=65536replica.socket.timeout.ms=30000
> socket.receive.buffer.bytes=102400
> socket.request.max.bytes=104857600
> socket.send.buffer.bytes=102400
> zookeeper.connect=sandbox.hortonworks.com:2181z
> ookeeper.connection.timeout.ms=15000zookeeper.session.timeout.ms=
> 30000zookeeper.sync.time.ms=2000
>
> After adding property i am getting same exception.
>
> Any suggestion.
>
> Thanks
>
> Shashi
>