You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Guozhang Wang (JIRA)" <ji...@apache.org> on 2014/07/18 17:23:04 UTC

[jira] [Commented] (KAFKA-1150) Fetch on a replicated topic does not return as soon as possible

    [ https://issues.apache.org/jira/browse/KAFKA-1150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14066435#comment-14066435 ] 

Guozhang Wang commented on KAFKA-1150:
--------------------------------------

Simon, I am currently working on KAFKA-1430, which hopefully will fixes both problems reported here. You can probably take a look at the ticket for now and let me know if you think anything is still missing.

> Fetch on a replicated topic does not return as soon as possible
> ---------------------------------------------------------------
>
>                 Key: KAFKA-1150
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1150
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, replication
>    Affects Versions: 0.8.0
>            Reporter: Andrey Balmin
>            Assignee: Neha Narkhede
>         Attachments: Test.java
>
>
> I see a huge performance difference between replicated and not replicated topics. On my laptop, running two brokers, I see producer-2-consumer latency of under 1ms for topics with one replica. 
> However,  with two replicas the same latency equals to the max fetch delay. Here is a simple test I just did:
> one producer thread in a loop sending one message and sleeping for 2500ms, and one consumer thread looping on the long poll with max fetch delay of 1000 ms.
> Here is what happens with no replication:
> Produced 1	 key: key1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:52.822
> Consumed up to 1 at time: 15:33:53.823
> Consumed up to 1 at time: 15:33:54.825
> Produced 2	 key: key2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:55.324
> Consumed up to 2 at time: 15:33:56.326
> Consumed up to 2 at time: 15:33:57.328
> Produced 3	 key: key3 at time: 15:33:57.827
> Consumed up to 3 at time: 15:33:57.827
> The are no delays between the message being produced and consumed -- this is the behavior I expected. 
> Here is the same test, but for a topic with two replicas:
> Consumed up to 0 at time: 15:50:29.575
> Produced 1	 key: key1 at time: 15:50:29.575
> Consumed up to 1 at time: 15:50:30.577
> Consumed up to 1 at time: 15:50:31.579
> Consumed up to 1 at time: 15:50:32.078
> Produced 2	 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> Consumed up to 2 at time: 15:50:34.081
> Consumed up to 2 at time: 15:50:34.581
> Produced 3	 key: key3 at time: 15:50:34.581
> Consumed up to 3 at time: 15:50:35.584
> Notice how the fetch always returns as soon as the produce request is issued, but without the new message, which consistently arrives ~1002 ms later.
> Below is the request log snippet for this part:
> Produced 2	 key: key2 at time: 15:50:32.078
> Consumed up to 2 at time: 15:50:33.081
> You can see the first FetchRequest returns at the same time as the replica FetchRequest, but this fetch response is *empty* -- the message is not committed yet, so it cannot be returned. The message is committed at 15:50:32,079. However, the next FetchRequest (that does return the message) comes in at 15:50:32,078, but completes only at 15:50:33,081. Why is it waiting for the full 1000 ms, instead of returning right away?
> [2013-11-25 15:50:32,077] TRACE Processor 1 received request : Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 7; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:499,queueTime:0,localTime:0,remoteTime:499,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3463; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1048576) from client /127.0.0.1:63056;totalTime:499,queueTime:1,localTime:0,remoteTime:498,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,078] TRACE Processor 1 received request : Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,078] TRACE Completed request:Name: ProducerRequest; Version: 0; CorrelationId: 5; ClientId: pro; RequiredAcks: 1; AckTimeoutMs: 20 ms; TopicAndPartition: [test_topic,0] -> 2078 from client /0:0:0:0:0:0:0:1%0:63266;totalTime:1,queueTime:0,localTime:1,remoteTime:0,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,079] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:32,581] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 3464; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) from client /127.0.0.1:63056;totalTime:503,queueTime:1,localTime:0,remoteTime:502,sendTime:0 (kafka.request.logger)
> [2013-11-25 15:50:32,582] TRACE Processor 0 received request : Name: FetchRequest; Version: 0; CorrelationId: 3465; ClientId: ReplicaFetcherThread-0-0; ReplicaId: 1; MaxWait: 500 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(130,1048576) (kafka.network.RequestChannel$)
> [2013-11-25 15:50:33,081] TRACE Completed request:Name: FetchRequest; Version: 0; CorrelationId: 8; ClientId: con; ReplicaId: -1; MaxWait: 1000 ms; MinBytes: 1 bytes; RequestInfo: [test_topic,0] -> PartitionFetchInfo(129,1024000) from client /0:0:0:0:0:0:0:1%0:63264;totalTime:1003,queueTime:0,localTime:1,remoteTime:1001,sendTime:1 (kafka.request.logger)
> ----------
> Environment note: I first noticed this behavior on three brokers running on three Ubuntu EC2 instances. I then boiled it down to this simple test running two brokers on a Mac laptop.



--
This message was sent by Atlassian JIRA
(v6.2#6252)