You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "wangxu(alvin) (JIRA)" <ji...@apache.org> on 2013/11/13 05:48:22 UTC

[jira] [Commented] (KAFKA-270) sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

    [ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13820938#comment-13820938 ] 

wangxu(alvin) commented on KAFKA-270:
-------------------------------------

can also try closing the producer.
I would get the below error in broker console without closing producer. If I close the producer, the error will never appear.
[2013-11-05 14:07:34,097] ERROR Closing socket for /192.168.30.114 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
        at sun.nio.ch.IOUtil.read(IOUtil.java:171)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
        at kafka.utils.Utils$.read(Utils.scala:538)
        at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
        at kafka.network.Processor.read(SocketServer.scala:311)
        at kafka.network.Processor.run(SocketServer.scala:214)
        at java.lang.Thread.run(Thread.java:662)

>  sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-270
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 0.7
>         Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux 
> ext3 file system with raid10
>            Reporter: Praveen Ramachandra
>              Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
> ====End consumer CODE============================
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName, msg));
>            pc.incrementAndGet();
>        }
>        java.util.Date date = new java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
> =====END Producer CODE========================
> I see a bunch of exceptions like this
> [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> 	at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
> 	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
> 	at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
> 	at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
> 	at kafka.network.MultiSend.writeTo(Transmission.scala:95)
> 	at kafka.network.Processor.write(SocketServer.scala:332)
> 	at kafka.network.Processor.run(SocketServer.scala:209)
> 	at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcher.read0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> 	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> 	at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
> 	at kafka.utils.Utils$.read(Utils.scala:485)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Processor.read(SocketServer.scala:304)
> 	at kafka.network.Processor.run(SocketServer.scala:207)
> 	at java.lang.Thread.run(Thread.java:662)
> And Many INFO messages e.g.,
> INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> INFO: Closed socket connection for client /127.0.0.1:59884 which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)



--
This message was sent by Atlassian JIRA
(v6.1#6144)