You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Praveen Ramachandra <pr...@gmail.com> on 2012/02/10 10:57:10 UTC

ery slow producer and consumer throughput: Using Kafka 0.7.0

Hi All,


I am getting ridiculously low producer and consumer throughput.

I am using default config values for producer, consumer and broker
which are very good starting points, as they should yield sufficient
throughput.

Only config that I changed on the server is "num-partitions". Changed
it to 20 (instead of 1). With this change the throughput increased to
2k messages per second (size 1k), but still it is far lower than what
I would have expected.

Appreciate if you can point to settings/changes-in-code needs to be done
to get higher throughput.



====Consumer Code=====
        long startTime = System.currentTimeMillis();
        long endTime = startTime + runDuration*1000l;

        Properties props = new Properties();
        props.put("zk.connect", "localhost:2181");
        props.put("groupid", subscriptionName); // to support multiple
subscribers
        props.put("zk.sessiontimeout.ms", "400");
        props.put("zk.synctime.ms", "200");
        props.put("autocommit.interval.ms", "1000");

        consConfig =  new ConsumerConfig(props);
        consumer =
kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topicName, new Integer(1)); // has the topic
to which to subscribe to
        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
consumer.createMessageStreams(topicCountMap);
        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
        ConsumerIterator<Message> it = stream.iterator();

        while(System.currentTimeMillis() <= endTime )
        {
            it.next(); // discard data
            consumeMsgCount.incrementAndGet();
        }

====End consumer CODE============================


=====Producer CODE========================
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("zk.connect", "localhost:2181");
            // Use random partitioner. Don't need the key type. Just
set it to Integer.
            // The message is of type String.
            producer = new kafka.javaapi.producer.Producer<Integer,
String>(new ProducerConfig(props));


        long endTime = startTime + runDuration*1000l; // run duration
is in seconds
        while(System.currentTimeMillis() <= endTime )
        {
            String msg =
org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
            producer.send(new ProducerData<Integer, String>(topicName, msg));
            pc.incrementAndGet();

        }
        java.util.Date date = new java.util.Date(System.currentTimeMillis());
        System.out.println(date+" :: stopped producer for topic"+topicName);

=====END Producer CODE========================


--
Regards,
Praveen Ramachandra



-- 
--
Regards,
Praveen Ramachandra

Re: ery slow producer and consumer throughput: Using Kafka 0.7.0

Posted by Hisham Mardam-Bey <hi...@mate1inc.com>.
On Fri, Feb 10, 2012 at 4:57 AM, Praveen Ramachandra
<pr...@gmail.com> wrote:
> Hi All,
>
>
> I am getting ridiculously low producer and consumer throughput.
>
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
>
> Only config that I changed on the server is "num-partitions". Changed
> it to 20 (instead of 1). With this change the throughput increased to
> 2k messages per second (size 1k), but still it is far lower than what
> I would have expected.
>
> Appreciate if you can point to settings/changes-in-code needs to be done
> to get higher throughput.
>
>
>
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
>
> ====End consumer CODE============================
>
>
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>
>
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName, msg));
>            pc.incrementAndGet();
>
>        }
>        java.util.Date date = new java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
>
> =====END Producer CODE========================
>

Hi Praveen,

I've not used the Java API for Kafka (we use Scala here) so this might
not be the case, but, is your producer sync or async? You can add the
following to make it async:

props.put("producer.type", "async");

Take a look at: http://incubator.apache.org/kafka/quickstart.html
(particularly bullet point 10):

"Use the asynchronous producer along with GZIP compression. This
buffers writes in memory until either batch.size or queue.time is
reached. After that, data is sent to the Kafka brokers".

Hope this helps,

hmb.

-- 
Hisham Mardam Bey

A: Because it messes up the order in which people normally read text.
Q: Why is top-posting such a bad thing?
A: Top-posting.
Q: What is the most annoying thing in e-mail?

-=[ Codito Ergo Sum ]=-

Re: ery slow producer and consumer throughput: Using Kafka 0.7.0

Posted by Jun Rao <ju...@gmail.com>.
It could be that flush.interval in the broker is 1. Try increasing that to
a larger number like 1000.

Thanks,

Jun

On Fri, Feb 10, 2012 at 1:57 AM, Praveen Ramachandra <pr...@gmail.com>wrote:

> Hi All,
>
>
> I am getting ridiculously low producer and consumer throughput.
>
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
>
> Only config that I changed on the server is "num-partitions". Changed
> it to 20 (instead of 1). With this change the throughput increased to
> 2k messages per second (size 1k), but still it is far lower than what
> I would have expected.
>
> Appreciate if you can point to settings/changes-in-code needs to be done
> to get higher throughput.
>
>
>
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
>
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
>
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
>
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =
>  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
>
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
>
> ====End consumer CODE============================
>
>
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
>
>
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName,
> msg));
>            pc.incrementAndGet();
>
>        }
>        java.util.Date date = new
> java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
>
> =====END Producer CODE========================
>
>
> --
> Regards,
> Praveen Ramachandra
>
>
>
> --
> --
> Regards,
> Praveen Ramachandra
>

Re: ery slow producer and consumer throughput: Using Kafka 0.7.0

Posted by Tim Lossen <ti...@lossen.de>.
praveen, we had the same problem at first, make sure you do not flush after every single message to disk, this kills throughput. not sure what the config option is called, but it was on by default.

tim

Sent from my iPhone

On 10.02.2012, at 10:57, Praveen Ramachandra <pr...@gmail.com> wrote:

> Hi All,
> 
> 
> I am getting ridiculously low producer and consumer throughput.
> 
> I am using default config values for producer, consumer and broker
> which are very good starting points, as they should yield sufficient
> throughput.
> 
> Only config that I changed on the server is "num-partitions". Changed
> it to 20 (instead of 1). With this change the throughput increased to
> 2k messages per second (size 1k), but still it is far lower than what
> I would have expected.
> 
> Appreciate if you can point to settings/changes-in-code needs to be done
> to get higher throughput.
> 
> 
> 
> ====Consumer Code=====
>        long startTime = System.currentTimeMillis();
>        long endTime = startTime + runDuration*1000l;
> 
>        Properties props = new Properties();
>        props.put("zk.connect", "localhost:2181");
>        props.put("groupid", subscriptionName); // to support multiple
> subscribers
>        props.put("zk.sessiontimeout.ms", "400");
>        props.put("zk.synctime.ms", "200");
>        props.put("autocommit.interval.ms", "1000");
> 
>        consConfig =  new ConsumerConfig(props);
>        consumer =
> kafka.consumer.Consumer.createJavaConsumerConnector(consConfig);
> 
>        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
>        topicCountMap.put(topicName, new Integer(1)); // has the topic
> to which to subscribe to
>        Map<String, List<KafkaMessageStream<Message>>> consumerMap =
> consumer.createMessageStreams(topicCountMap);
>        KafkaMessageStream<Message> stream =  consumerMap.get(topicName).get(0);
>        ConsumerIterator<Message> it = stream.iterator();
> 
>        while(System.currentTimeMillis() <= endTime )
>        {
>            it.next(); // discard data
>            consumeMsgCount.incrementAndGet();
>        }
> 
> ====End consumer CODE============================
> 
> 
> =====Producer CODE========================
>        props.put("serializer.class", "kafka.serializer.StringEncoder");
>        props.put("zk.connect", "localhost:2181");
>            // Use random partitioner. Don't need the key type. Just
> set it to Integer.
>            // The message is of type String.
>            producer = new kafka.javaapi.producer.Producer<Integer,
> String>(new ProducerConfig(props));
> 
> 
>        long endTime = startTime + runDuration*1000l; // run duration
> is in seconds
>        while(System.currentTimeMillis() <= endTime )
>        {
>            String msg =
> org.apache.commons.lang.RandomStringUtils.random(msgSizes.get(0));
>            producer.send(new ProducerData<Integer, String>(topicName, msg));
>            pc.incrementAndGet();
> 
>        }
>        java.util.Date date = new java.util.Date(System.currentTimeMillis());
>        System.out.println(date+" :: stopped producer for topic"+topicName);
> 
> =====END Producer CODE========================
> 
> 
> --
> Regards,
> Praveen Ramachandra
> 
> 
> 
> -- 
> --
> Regards,
> Praveen Ramachandra