You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Mudassir Maredia <sh...@gmail.com> on 2016/10/05 02:34:17 UTC

KafkaConsumer poll poor performance (0.10.0.0)

I am using KafkaConsumer from 0.10.0.0 version. Below is the sample code. I
have tried to optimize it to get to a maximum number but that number is
really very small. At times I have seen 80,000 records processed per second
but at times it shows only 10,000 records per second. I have tried
different values for poll(long ms) along with different properties but no
luck.

Previously, I have used 0.8.0.0 version createMessageStream(topicCountMap)
works great, 180,000 per second with some processing involved too.

So my questions are:

1. Can someone share the number of records per second processed  using
0.10.0.0 KafkaConsumer?
2. How to optimize to get the max out of KafkaConsumer, forget processing.
3. If I do records.count() after poll(),  it shows 0 many many times. Why
does it return 0?




         Properties props = new Properties();
         props.put("bootstrap.servers", "kafka-app.server.com:9092");
         props.put("group.id", "eclipse-3");
         props.put("enable.auto.commit", "true");
         props.put("auto.commit.interval.ms", "1000");
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
         props.put("auto.offset.reset", "earliest"); //latest, earliest,
none



         KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);

         List<String> topicsList = new ArrayList<>();

         topicsList.add("topic1");

         consumer.subscribe(topicsList);

         int index=0;
         long poll = 0;
         while (true) {

             ConsumerRecords<String,String> records = consumer.poll(poll);

             System.out.println("number of records: "+records.count());

             for (ConsumerRecord<String,String> record: records){

                 if(index%5000==0){
                    //calculateTime(System.currentTimeMillis())
                 }
                 index++;
             }
             }