You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Joel Koshy (JIRA)" <ji...@apache.org> on 2013/11/27 03:26:35 UTC

[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

    [ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13833366#comment-13833366 ] 

Joel Koshy commented on KAFKA-1150:
-----------------------------------

Thank you for reporting this. We currently unblock delayed fetch requests when a producer request arrives and its byte count meets the min.bytes requirement of those fetch requests. For regular (non-replica fetcher) consumers we can only expose up to the high watermark - which means that consumer fetch request would (typically) send back an empty response and the consumer would then refetch as you found.

We should ideally unblock these regular consumer fetch requests only when we advance the high watermark. However, the problem there is that the HW is measured in logical offsets, not bytes. We could change the min.bytes concept to min.messages but I think it is worth thinking about a better solution for this.


> Fetch on a replicated topic does not return as soon as possible
> ---------------------------------------------------------------
>
>                 Key: KAFKA-1150
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1150
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, replication
>    Affects Versions: 0.8
>            Reporter: Andrey Balmin
>            Assignee: Neha Narkhede
>
> I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. 
> However,  with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms.
> Here is what happens with no replication:
> Produced 1	 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2	 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3	 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is the behavior I expected. 
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1	 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2	 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3	 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2	 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,581] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) from client /127.0.0.1:63056;totalTime:503,queueTime:1,localTime:0,remoteTime:502,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,582] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3465; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:33,081] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:1003,queueTime:0,localTime:1,remoteTime:1001,sendTime:1 (kafka.request.logger)
> ----------
> Environment note: I first noticed this behavior on three brokers running on three Ubuntu EC2 instances. I then boiled it down to this simple test running two brokers on a Mac laptop.



--
This message was sent by Atlassian JIRA
(v6.1#6144)