You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Jay Kreps (JIRA)" <ji...@apache.org> on 2015/01/30 03:17:34 UTC

[jira] [Commented] (KAFKA-1905) KafkaProducer's performance could be halved when MaxInFlightRequest is set to 1

    [ https://issues.apache.org/jira/browse/KAFKA-1905?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14298106#comment-14298106 ] 

Jay Kreps commented on KAFKA-1905:
----------------------------------

It's not every day you double performance by moving a few lines of code around...

> KafkaProducer's performance could be halved when MaxInFlightRequest is set to 1
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-1905
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1905
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Jiangjie Qin
>            Assignee: Jiangjie Qin
>
> In KafkaProducer, the following logic is used in each poll():
> 1. Get a list of nodes who has a batch available for sending
> 2. Filter the list to remove the node which is not ready to receive a new request (MaxInFlightRequests is checked here) 
> 3. Compose the requests for the nodes in the filtered list, i.e. has a batch to send and also ready to receive.
> 4. Increase InFlightRequests, send the requests and get the responses of previous send.
> 5. handle all receives and decrease the inFlightRequests.
> In this case, when MaxInFlightRequest is set to 1, since we are checking the InFlightRequests before each receive, even if we have already received the response, the node will still be considered not ready. So for a sequence of poll, we end up in the PollForSend - PollForReceive - PollForSend... pattern. Which essentially halved the throughput in a fast network. Ideally we should check whether node is ready after we check all the receives.
> Here are the some logs that shows this situation when I run kafka-producer-perf-test locally.
> -----1st poll for send, no receive------
> [2015-01-28 13:54:06,009] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1074,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender)
> ------ 2nd poll for receive, no send------
> [2015-01-28 13:54:06,009] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,009] INFO responses #: 1 (org.apache.kafka.clients.producer.internals.Sender)
> ------ 3rd poll for send, no receive------
> [2015-01-28 13:54:06,010] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1075,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender)
> ---- 4th poll for receive, no send----
> [2015-01-28 13:54:06,010] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,010] INFO responses #: 1 (org.apache.kafka.clients.producer.internals.Sender)
> ---- 5th poll for send, no receive----
> [2015-01-28 13:54:06,011] INFO Nodes with data ready to send: [Node(0, jqin-ld1.linkedin.biz, 9092)] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO Created 1 produce requests: [ClientRequest(expectResponse=true, payload={producer_perf_verification1-0=RecordBatch(topicPartition=producer_perf_verification1-0, recordCount=15)}, request=RequestSend(header={api_key=0,api_version=0,correlation_id=1076,client_id=producer-performance}, body={acks=-1,timeout=3000,topic_data=[{topic=producer_perf_verification1,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=15780 cap=16384]}]}]}))] (org.apache.kafka.clients.producer.internals.Sender)
> [2015-01-28 13:54:06,011] INFO responses #: 0 (org.apache.kafka.clients.producer.internals.Sender)
> ---- 6th poll for receive, no send-----
> [2015-01-28 13:54:06,011] INFO No ready nodes, timeout = 9223372036854775807 (org.apache.kafka.clients.producer.internals.Sender)
> .........



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)