You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jun Rao (Commented) (JIRA)" <ji...@apache.org> on 2012/02/13 19:09:00 UTC

[jira] [Commented] (KAFKA-270) sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html

    [ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13207023#comment-13207023 ] 

Jun Rao commented on KAFKA-270:
-------------------------------

A few things to try:

1. It seems that there is ZK session expiration in the client. This should be rare. If it's frequent, it's very likely caused by client GC. Please check your GC log.
2. Enable debug level logging in FileMessageSet in the broker. You will see the flush time for each log write. See if the flush time is reasonable (typically low 10s of ms) since it controls how many IOs a broker can do per second. 
                
>  sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-270
>                 URL: https://issues.apache.org/jira/browse/KAFKA-270
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, core
>    Affects Versions: 0.7
>         Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux 
> ext3 file system with raid10
>            Reporter: Praveen Ramachandra
>              Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
> ====End consumer CODE============================
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName, msg));
>            pc.incrementAndGet();
>        }
>        java.util.Date date = new java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
> =====END Producer CODE========================
> I see a bunch of exceptions like this
> [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> 	at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
> 	at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
> 	at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
> 	at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
> 	at kafka.network.MultiSend.writeTo(Transmission.scala:95)
> 	at kafka.network.Processor.write(SocketServer.scala:332)
> 	at kafka.network.Processor.run(SocketServer.scala:209)
> 	at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
> 	at sun.nio.ch.FileDispatcher.read0(Native Method)
> 	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> 	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> 	at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> 	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
> 	at kafka.utils.Utils$.read(Utils.scala:485)
> 	at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> 	at kafka.network.Processor.read(SocketServer.scala:304)
> 	at kafka.network.Processor.run(SocketServer.scala:207)
> 	at java.lang.Thread.run(Thread.java:662)
> And Many INFO messages e.g.,
> INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> INFO: Closed socket connection for client /127.0.0.1:59884 which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira