You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shrikant Patel <SP...@pdxinc.com> on 2015/09/26 07:38:13 UTC

Kafka on AWS - issue publishing from remote producer

I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The instance is working fine.

[kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1 --zookeeper localhost:2181 --describe
Topic:topic1    PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic1   Partition: 0    Leader: 0       Replicas: 0     Isr: 0

When producer running on my local machine (remote) tries to publish to topic1 it fails with exception below. It does create the topic1 (If topic1 didn't exist), Also on kakfa log I do see that producer was able to connect to kafka server.
[2015-09-26 01:24:26,874] INFO Closing socket connection to /X0X.23X.X4X.X3X. (kafka.network.Processor). These indicate the local producer is able to connect to kafka instance on AWS.

If I run producer on Kafka server itself, its able to publish to topic1 without any issue. I am not sure I understand what the issue with local producer. Any pointer will be helpful???

C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-console-producer.bat --broker-list public-ip-address-of-kafka-ec2:9092 --topic topic1 --queue-enqueuetimeout-ms 10000 --request-timeout-ms 10000
[2015-09-25 23:08:00,146] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
12
[2015-09-25 23:08:02,977] WARN Failed to send producer request with correlation id 2 to broker 0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
       at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:03,653] WARN Failed to send producer request with correlation id 5 to broker 0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,296] WARN Failed to send producer request with correlation id 8 to broker 0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)


[2015-09-25 23:08:04,954] WARN Failed to send producer request with correlation id 11 to broker 0 with data for partitions [topic1,0] (kafka.producer.async.DefaultEventHandler)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
        at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
        at kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
        at kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
        at kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
[2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1 with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
[2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)

Thanks,
Shri


________________________________
This message and its contents (to include attachments) are the property of National Health Systems, Inc. and may contain confidential and proprietary information. This email and any files transmitted with it are intended solely for the use of the individual or entity to whom they are addressed. You are hereby notified that any unauthorized disclosure, copying, or distribution of this message, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, you should delete this message immediately and notify the sender immediately by telephone or by replying to this transmission.

RE: Kafka on AWS - issue publishing from remote producer

Posted by Shrikant Patel <SP...@pdxinc.com>.
Joe,

Thanks for your help. Updating the server properties with ec2 instance's hostname fixed it.

-Shri

-----Original Message-----
From: Joe Lawson [mailto:jlawson@opensourceconnections.com]
Sent: Saturday, September 26, 2015 9:25 AM
To: users@kafka.apache.org
Subject: Re: Kafka on AWS - issue publishing from remote producer

Make sure you have host.name in you server properties setup right. I usually give it the ec2 DNS name. Another case where that helped:
https://discuss.elastic.co/t/logstash-kafka-output-
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
plugins-
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
not-working-on-windows
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
/25253/3?u=
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
joe
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
_lawson
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
and here:
https://discuss.elastic.co/t/logstash-1-5-3-is-not-able-to-connect-to-kafka/27611/5?u=joe_lawson
On Sep 26, 2015 1:38 AM, "Shrikant Patel" <SP...@pdxinc.com> wrote:

> I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The
> instance is working fine.
>
> [kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1
> --zookeeper
> localhost:2181 --describe
> Topic:topic1    PartitionCount:1        ReplicationFactor:1     Configs:
>         Topic: topic1   Partition: 0    Leader: 0       Replicas: 0
>  Isr: 0
>
> When producer running on my local machine (remote) tries to publish to
> topic1 it fails with exception below. It does create the topic1 (If
> topic1 didn't exist), Also on kakfa log I do see that producer was
> able to connect to kafka server.
> [2015-09-26 01:24:26,874] INFO Closing socket connection to
> /X0X.23X.X4X.X3X. (kafka.network.Processor). These indicate the local
> producer is able to connect to kafka instance on AWS.
>
> If I run producer on Kafka server itself, its able to publish to
> topic1 without any issue. I am not sure I understand what the issue
> with local producer. Any pointer will be helpful???
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-consol
> e-producer.bat --broker-list public-ip-address-of-kafka-ec2:9092
> --topic topic1 --queue-enqueuetimeout-ms 10000 --request-timeout-ms
> 10000
> [2015-09-25 23:08:00,146] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
> 12
> [2015-09-25 23:08:02,977] WARN Failed to send producer request with
> correlation id 2 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>        at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
> [2015-09-25 23:08:03,653] WARN Failed to send producer request with
> correlation id 5 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
> [2015-09-25 23:08:04,296] WARN Failed to send producer request with
> correlation id 8 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
> [2015-09-25 23:08:04,954] WARN Failed to send producer request with
> correlation id 11 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
> [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics
> topic1 with correlation ids in [0,12]
> (kafka.producer.async.DefaultEventHandler)
> [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
>
>
> [2015-09-25 23:08:04,954] WARN Failed to send producer request with
> correlation id 11 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
> [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics
> topic1 with correlation ids in [0,12]
> (kafka.producer.async.DefaultEventHandler)
> [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:4
> 5)
>
> Thanks,
> Shri
>
>
> ________________________________
> This message and its contents (to include attachments) are the
> property of National Health Systems, Inc. and may contain confidential
> and proprietary information. This email and any files transmitted with
> it are intended solely for the use of the individual or entity to whom they are addressed.
> You are hereby notified that any unauthorized disclosure, copying, or
> distribution of this message, or the taking of any unauthorized action
> based on information contained herein is strictly prohibited.
> Unauthorized use of information contained herein may subject you to
> civil and criminal prosecution and penalties. If you are not the
> intended recipient, you should delete this message immediately and
> notify the sender immediately by telephone or by replying to this transmission.
>

This message and its contents (to include attachments) are the property of National Health Systems, Inc. and may contain confidential and proprietary information. This email and any files transmitted with it are intended solely for the use of the individual or entity to whom they are addressed. You are hereby notified that any unauthorized disclosure, copying, or distribution of this message, or the taking of any unauthorized action based on information contained herein is strictly prohibited. Unauthorized use of information contained herein may subject you to civil and criminal prosecution and penalties. If you are not the intended recipient, you should delete this message immediately and notify the sender immediately by telephone or by replying to this transmission.

Re: Kafka on AWS - issue publishing from remote producer

Posted by Joe Lawson <jl...@opensourceconnections.com>.
Make sure you have host.name in you server properties setup right. I
usually give it the ec2 DNS name. Another case where that helped:
https://discuss.elastic.co/t/logstash-kafka-output-
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
plugins-
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
not-working-on-windows
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
/25253/3?u=
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
joe
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
_lawson
<https://discuss.elastic.co/t/logstash-kafka-output-plugins-not-working-on-windows/25253/3?u=joe_lawson>
and here:
https://discuss.elastic.co/t/logstash-1-5-3-is-not-able-to-connect-to-kafka/27611/5?u=joe_lawson
On Sep 26, 2015 1:38 AM, "Shrikant Patel" <SP...@pdxinc.com> wrote:

> I have Kafka 2.10-0.8.2.1 and zookeeper installed on the AWS EC2. The
> instance is working fine.
>
> [kafka@ip-xx-xx-xx-xx bin]$ ./kafka-topics.sh --topic topic1 --zookeeper
> localhost:2181 --describe
> Topic:topic1    PartitionCount:1        ReplicationFactor:1     Configs:
>         Topic: topic1   Partition: 0    Leader: 0       Replicas: 0
>  Isr: 0
>
> When producer running on my local machine (remote) tries to publish to
> topic1 it fails with exception below. It does create the topic1 (If topic1
> didn't exist), Also on kakfa log I do see that producer was able to connect
> to kafka server.
> [2015-09-26 01:24:26,874] INFO Closing socket connection to
> /X0X.23X.X4X.X3X. (kafka.network.Processor). These indicate the local
> producer is able to connect to kafka instance on AWS.
>
> If I run producer on Kafka server itself, its able to publish to topic1
> without any issue. I am not sure I understand what the issue with local
> producer. Any pointer will be helpful???
>
> C:\JAVA_INSTALLATION\kafka\kafka_2.10-0.8.2.1>bin\windows\kafka-console-producer.bat
> --broker-list public-ip-address-of-kafka-ec2:9092 --topic topic1
> --queue-enqueuetimeout-ms 10000 --request-timeout-ms 10000
> [2015-09-25 23:08:00,146] WARN Property topic is not valid
> (kafka.utils.VerifiableProperties)
> 12
> [2015-09-25 23:08:02,977] WARN Failed to send producer request with
> correlation id 2 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>        at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-09-25 23:08:03,653] WARN Failed to send producer request with
> correlation id 5 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-09-25 23:08:04,296] WARN Failed to send producer request with
> correlation id 8 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-09-25 23:08:04,954] WARN Failed to send producer request with
> correlation id 11 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1
> with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
> [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>
>
> [2015-09-25 23:08:04,954] WARN Failed to send producer request with
> correlation id 11 to broker 0 with data for partitions [topic1,0]
> (kafka.producer.async.DefaultEventHandler)
> java.nio.channels.ClosedChannelException
>         at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
>         at
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1$$anonfun$apply$mcV$sp$1.apply(SyncProducer.scala:103)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply$mcV$sp(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at
> kafka.producer.SyncProducer$$anonfun$send$1.apply(SyncProducer.scala:102)
>         at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$send(DefaultEventHandler.scala:255)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:106)
>         at
> kafka.producer.async.DefaultEventHandler$$anonfun$dispatchSerializedData$2.apply(DefaultEventHandler.scala:100)
>         at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
>         at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
>         at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
>         at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
>         at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
>         at
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:100)
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:72)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
> [2015-09-25 23:08:05,597] ERROR Failed to send requests for topics topic1
> with correlation ids in [0,12] (kafka.producer.async.DefaultEventHandler)
> [2015-09-25 23:08:05,597] ERROR Error in handling batch of 1 events
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3
> tries.
>         at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>         at
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:88)
>         at
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:68)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:547)
>         at
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:67)
>         at
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
>
> Thanks,
> Shri
>
>
> ________________________________
> This message and its contents (to include attachments) are the property of
> National Health Systems, Inc. and may contain confidential and proprietary
> information. This email and any files transmitted with it are intended
> solely for the use of the individual or entity to whom they are addressed.
> You are hereby notified that any unauthorized disclosure, copying, or
> distribution of this message, or the taking of any unauthorized action
> based on information contained herein is strictly prohibited. Unauthorized
> use of information contained herein may subject you to civil and criminal
> prosecution and penalties. If you are not the intended recipient, you
> should delete this message immediately and notify the sender immediately by
> telephone or by replying to this transmission.
>