You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Shamik Banerjee <sh...@yahoo.com.INVALID> on 2016/10/05 03:57:05 UTC

Kafka consumer configuration / performance issues

Hi,

  I'm a newbie trying out kafka as an alternative to AWS SQS. The motivation primarily is to improve performance where kafka would eliminate the constraint of pulling 10 messages at a time with a cap of 256kb. Here's a high-level scenario of my use case. I've a bunch of crawlers which are sending documents for indexing. The size of the payload is around 1 mb on average. The crawlers call a SOAP end-point which in turn runs a producer code to submit the messages to a kafka queue. The consumer app picks up the messages and processes them. For my test box, I've configured the topic with 30 partitions with 2 replication. The two kafka instances are running with 1 zookeeper instance. The kafka version is 0.10.0.

For my testing, I published 7 million messages in the queue. I created a consumer group with 30 consumer thread , one per partition. I was initially under the impression that this would substantially speed up the processing power compared to what I was getting via SQS. Unfortunately, that was not to be the case. In my case, the processing of data is complex and takes up 1-2 minutes on average to complete.That lead to a flurry of partition rebalancing as the threads were not able to heartbeat on time. I could see a bunch of messages in the log citing "Auto offset commit failed for group full_group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in the poll() with max.poll.records." This lead to the same message being processed multiple times. I tried playing around with session timeout, max.poll.records and poll time to avoid this, but that slowed down the overall processing bigtime. Here's some of the configuration parameter:

metadata.max.age.ms = 300000 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 10000 
request.timeout.ms = 310000 
heartbeat.interval.ms = 100000 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 540000 
session.timeout.ms = 300000 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 30000 
auto.offset.reset = latest

I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, eliminated duplicate processing but slowed down the overall process significantly. It ended up taking 35 hours to complete processing all 6 million messages compared to 25 hours using the SQS based solution. Each consumer thread on average retrieved 50-60 messages per poll, though some of them polled 0 records at times. I'm not sure about this behavior when there are a huge amount messages available in the partition. The same thread was able to pick up messages during the subsequent iteration. Could this be due to rebalancing ? 

Here's my consumer code:

while (true) { 
    try{ 
        ConsumerRecords<String, TextAnalysisRequest> records = consumer.poll(100); 
        for (ConsumerRecord<String, TextAnalysisRequest> record : records) { 
            if(record.value()!=null){ 
                TextAnalysisRequest textAnalysisObj = record.value(); 
                if(textAnalysisObj!=null){ 
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
                }
            } 
        } 
    }catch(Exception ex){ 
        LOGGER.error("Error in Full Consumer group worker", ex); 
    }

I understanding that record processing part is one bottleneck in my case. But I'm sure a few folks here have a similar use case of dealing with large processing time. I thought of doing an async processing by spinning each processor in it's dedicated thread or use a thread pool with large capacity, but not sure if it would create a big load in the system. At the same time, I've seen a couple of instances where people have used pause and resume API to perform the processing in order to avoid rebalancing issue.

I'm really looking for some advice / best practice in this circumstance. Particularly, the recommended configuration setting around hearbeat, request timeout, max poll records, auto commit interval, poll interval, etc. if kafka is not the right tool for my use case, please let me know as well.

Any pointers will be appreciated. 

-Thanks,
Shamik

Re: Kafka consumer configuration / performance issues

Posted by Shamik Banerjee <sh...@yahoo.com.INVALID>.
Sorry to bump this up, can anyone provide some input on this ? I need to make a call soon whether kafka is a good fit to our requirement.



On Tuesday, October 4, 2016 8:57 PM, Shamik Banerjee <sh...@yahoo.com> wrote:
Hi,

  I'm a newbie trying out kafka as an alternative to AWS SQS. The motivation primarily is to improve performance where kafka would eliminate the constraint of pulling 10 messages at a time with a cap of 256kb. Here's a high-level scenario of my use case. I've a bunch of crawlers which are sending documents for indexing. The size of the payload is around 1 mb on average. The crawlers call a SOAP end-point which in turn runs a producer code to submit the messages to a kafka queue. The consumer app picks up the messages and processes them. For my test box, I've configured the topic with 30 partitions with 2 replication. The two kafka instances are running with 1 zookeeper instance. The kafka version is 0.10.0.

For my testing, I published 7 million messages in the queue. I created a consumer group with 30 consumer thread , one per partition. I was initially under the impression that this would substantially speed up the processing power compared to what I was getting via SQS. Unfortunately, that was not to be the case. In my case, the processing of data is complex and takes up 1-2 minutes on average to complete.That lead to a flurry of partition rebalancing as the threads were not able to heartbeat on time. I could see a bunch of messages in the log citing "Auto offset commit failed for group full_group: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in the poll() with max.poll.records." This lead to the same message being processed multiple times. I tried playing around with session timeout, max.poll.records and poll time to avoid this, but that slowed down the overall processing bigtime. Here's some of the configuration parameter:

metadata.max.age.ms = 300000 
max.partition.fetch.bytes = 1048576 
bootstrap.servers = [kafkahost1:9092, kafkahost2:9092] 
enable.auto.commit = true 
max.poll.records = 10000 
request.timeout.ms = 310000 
heartbeat.interval.ms = 100000 
auto.commit.interval.ms = 1000 
receive.buffer.bytes = 65536 
fetch.min.bytes = 1 
send.buffer.bytes = 131072 
value.deserializer = class com.autodesk.preprocessor.consumer.serializer.KryoObjectSerializer 
group.id = full_group 
retry.backoff.ms = 100 
fetch.max.wait.ms = 500 
connections.max.idle.ms = 540000 
session.timeout.ms = 300000 
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
metrics.sample.window.ms = 30000 
auto.offset.reset = latest

I reduced the consumer poll time to 100 ms. It reduced the rebalancing issues, eliminated duplicate processing but slowed down the overall process significantly. It ended up taking 35 hours to complete processing all 6 million messages compared to 25 hours using the SQS based solution. Each consumer thread on average retrieved 50-60 messages per poll, though some of them polled 0 records at times. I'm not sure about this behavior when there are a huge amount messages available in the partition. The same thread was able to pick up messages during the subsequent iteration. Could this be due to rebalancing ? 

Here's my consumer code:

while (true) { 
    try{ 
        ConsumerRecords<String, TextAnalysisRequest> records = consumer.poll(100); 
        for (ConsumerRecord<String, TextAnalysisRequest> record : records) { 
            if(record.value()!=null){ 
                TextAnalysisRequest textAnalysisObj = record.value(); 
                if(textAnalysisObj!=null){ 
                    // Process record
                    PreProcessorUtil.submitPostProcessRequest(textAnalysisObj); 
                }
            } 
        } 
    }catch(Exception ex){ 
        LOGGER.error("Error in Full Consumer group worker", ex); 
    }

I understanding that record processing part is one bottleneck in my case. But I'm sure a few folks here have a similar use case of dealing with large processing time. I thought of doing an async processing by spinning each processor in it's dedicated thread or use a thread pool with large capacity, but not sure if it would create a big load in the system. At the same time, I've seen a couple of instances where people have used pause and resume API to perform the processing in order to avoid rebalancing issue.

I'm really looking for some advice / best practice in this circumstance. Particularly, the recommended configuration setting around hearbeat, request timeout, max poll records, auto commit interval, poll interval, etc. if kafka is not the right tool for my use case, please let me know as well.

Any pointers will be appreciated. 

-Thanks,
Shamik