You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Rasmeet Devji (JIRA)" <ji...@apache.org> on 2014/12/22 17:39:13 UTC

[jira] [Comment Edited] (KAFKA-1693) Issue sending more messages to single Kafka server (Load testing for Kafka transport)

    [ https://issues.apache.org/jira/browse/KAFKA-1693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14255900#comment-14255900 ] 

Rasmeet Devji edited comment on KAFKA-1693 at 12/22/14 4:38 PM:
----------------------------------------------------------------

Faced the same issue with Kafka 811, Java 6 on CentOS release 5.8 (Final)

Was able to resolve it by not opening too many connections to Kafka server.

As this link http://stackoverflow.com/questions/6145108/problem-running-into-java-net-bindexception-cannot-assign-requested-address  mentions – 
By default, Linux picks dynamically assigned ports from the range 32768..61000. The others are available for static assignment, if you bind to a specific port number. The range can be changed if you want more of the ports to be available for dynamic assignment, but just be careful that you do not include ports that are used for specific services that you need (e.g. 6000 for X11). Also you should not allow ports < 1024 to be dynamically assigned since they are privileged.

What you are most likely doing is creating a producer instance for every message you send. Instead you should have a limited number of producers or may be a pool of producers for your tests. 


was (Author: rasmeet devji):
Faced the same issue with Kafka 811, Java 6 on CentOS release 5.8 (Final)

Was able to resolve it by not opening too many connections to Kafka server.

As this link http://stackoverflow.com/questions/6145108/problem-running-into-java-net-bindexception-cannot-assign-requested-address  mentions – 
By default, Linux picks dynamically assigned ports from the range 32768..61000. The others are available for static assignment, if you bind to a specific port number. The range can be changed if you want more of the ports to be available for dynamic assignment, but just be careful that you do not include ports that are used for specific services that you need (e.g. 6000 for X11). Also you should not allow ports < 1024 to be dynamically assigned since they are privileged.

What you are most likely doing is creating a producer thread for every message you send. Instead you should have a limited number of threads or may be thread pool for your tests. 

> Issue sending more messages to single Kafka server (Load testing for Kafka transport)
> -------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1693
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1693
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>         Environment: Ubuntu 14, Java 6
>            Reporter: rajendram kathees
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> I tried to send 50000 messages to single Kafka server.I sent the messages to ESB using JMeter and ESB sent to Kafka server. After 28000 message I am getting following exception.Do I need to change any parameter value in Kafka server.Please give me the solution.
>  
> [2014-10-06 11:41:05,182] ERROR - Utils$ fetching topic metadata for topics [Set(test1)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test1)] from broker [ArrayBuffer(id:0,host:localhost,port:9092)] failed
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
> at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
> at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
> at kafka.utils.Utils$.swallow(Utils.scala:167)
> at kafka.utils.Logging$class.swallowError(Logging.scala:106)
> at kafka.utils.Utils$.swallowError(Utils.scala:46)
> at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
> at kafka.producer.Producer.send(Producer.scala:76)
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> at org.wso2.carbon.connector.KafkaProduce.send(KafkaProduce.java:71)
> at org.wso2.carbon.connector.KafkaProduce.connect(KafkaProduce.java:27)
> at org.wso2.carbon.connector.core.AbstractConnector.mediate(AbstractConnector.java:32)
> at org.apache.synapse.mediators.ext.ClassMediator.mediate(ClassMediator.java:78)
> at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:77)
> at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:47)
> at org.apache.synapse.mediators.template.TemplateMediator.mediate(TemplateMediator.java:77)
> at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:129)
> at org.apache.synapse.mediators.template.InvokeMediator.mediate(InvokeMediator.java:78)
> at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:77)
> at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:47)
> at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:131)
> at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:166)
> at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
> at org.apache.synapse.transport.passthru.ServerWorker.processNonEntityEnclosingRESTHandler(ServerWorker.java:344)
> at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:385)
> at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:183)
> at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.net.BindException: Cannot assign requested address
> at sun.nio.ch.Net.connect0(Native Method)
> at sun.nio.ch.Net.connect(Net.java:465)
> at sun.nio.ch.Net.connect(Net.java:457)
> at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:670)
> at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
> at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
> at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
> at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
> at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
> at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)