You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "wangxu(alvin) (JIRA)" <ji...@apache.org> on 2013/11/13 05:48:22 UTC
[jira] [Commented] (KAFKA-270) sync producer / consumer test
producing lot of kafka server exceptions & not getting the throughput
mentioned here http://incubator.apache.org/kafka/performance.html
[ https://issues.apache.org/jira/browse/KAFKA-270?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13820938#comment-13820938 ]
wangxu(alvin) commented on KAFKA-270:
-------------------------------------
can also try closing the producer.
I would get the below error in broker console without closing producer. If I close the producer, the error will never appear.
[2013-11-05 14:07:34,097] ERROR Closing socket for /192.168.30.114 because of error (kafka.network.Processor)
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcher.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
at sun.nio.ch.IOUtil.read(IOUtil.java:171)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
at kafka.utils.Utils$.read(Utils.scala:538)
at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
at kafka.network.Processor.read(SocketServer.scala:311)
at kafka.network.Processor.run(SocketServer.scala:214)
at java.lang.Thread.run(Thread.java:662)
> sync producer / consumer test producing lot of kafka server exceptions & not getting the throughput mentioned here http://incubator.apache.org/kafka/performance.html
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-270
> URL: https://issues.apache.org/jira/browse/KAFKA-270
> Project: Kafka
> Issue Type: Bug
> Components: clients, core
> Affects Versions: 0.7
> Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux
> ext3 file system with raid10
> Reporter: Praveen Ramachandra
> Labels: clients, core, newdev, performance
>
> I am getting ridiculously low producer and consumer throughput.
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> Appreciate if you point what settings/changes-in-code needs to be done
> to get higher throughput.
> I changed num.partitions in the server.config to 10.
> Please look below for exception and error messages from the server
> BTW: I am running server, zookeeper, producer, consumer on the same host.
> ====Consumer Code=====
> long startTime = System.currentTimeMillis();
> long endTime = startTime + runDuration*1000l;
> Properties props = new Properties();
> props.put("zk.connect", "localhost:2181");
> props.put("groupid", subscriptionName); // to support multiple
> subscribers
> props.put("zk.sessiontimeout.ms", "400");
> props.put("zk.synctime.ms", "200");
> props.put("autocommit.interval.ms", "1000");
> consConfig = new ConsumerConfig(props);
> consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
> Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
> topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
> Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
> KafkaMessageStream<Message> stream = consumerMap.get(topicName).get(0);
> ConsumerIterator<Message> it = stream.iterator();
> while(System.currentTimeMillis() <= endTime )
> {
> it.next(); // discard data
> consumeMsgCount.incrementAndGet();
> }
> ====End consumer CODE============================
> =====Producer CODE========================
> props.put("serializer.class", "kafka.serializer.StringEncoder");
> props.put("zk.connect", "localhost:2181");
> // Use random partitioner. Don't need the key type. Just
> set it to Integer.
> // The message is of type String.
> producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
> long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
> while(System.currentTimeMillis() <= endTime )
> {
> String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
> producer.send(new ProducerData<Integer, String>(topicName, msg));
> pc.incrementAndGet();
> }
> java.util.Date date = new java.util.Date(System.currentTimeMillis());
> System.out.println(date+" :: stopped producer for topic"+topicName);
> =====END Producer CODE========================
> I see a bunch of exceptions like this
> [2012-02-11 02:44:11,945] ERROR Closing socket for /188.125.88.145 because of error (kafka.network.Processor)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileChannelImpl.transferTo0(Native Method)
> at sun.nio.ch.FileChannelImpl.transferToDirectly(FileChannelImpl.java:405)
> at sun.nio.ch.FileChannelImpl.transferTo(FileChannelImpl.java:506)
> at kafka.message.FileMessageSet.writeTo(FileMessageSet.scala:107)
> at kafka.server.MessageSetSend.writeTo(MessageSetSend.scala:51)
> at kafka.network.MultiSend.writeTo(Transmission.scala:95)
> at kafka.network.Processor.write(SocketServer.scala:332)
> at kafka.network.Processor.run(SocketServer.scala:209)
> at java.lang.Thread.run(Thread.java:662)
> java.io.IOException: Connection reset by peer
> at sun.nio.ch.FileDispatcher.read0(Native Method)
> at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
> at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:198)
> at sun.nio.ch.IOUtil.read(IOUtil.java:171)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
> at kafka.utils.Utils$.read(Utils.scala:485)
> at kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
> at kafka.network.Processor.read(SocketServer.scala:304)
> at kafka.network.Processor.run(SocketServer.scala:207)
> at java.lang.Thread.run(Thread.java:662)
> And Many INFO messages e.g.,
> INFO: Expiring session 0x1356a43167e0009, timeout of 6000ms exceeded (org.apache.zookeeper.server.ZooKeeperServer)
> INFO: Closed socket connection for client /127.0.0.1:59884 which had sessionid 0x1356a43167e0022 (org.apache.zookeeper.server.NIOServerCnxn)
--
This message was sent by Atlassian JIRA
(v6.1#6144)