You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Joel Koshy <jj...@gmail.com> on 2015/06/05 04:43:09 UTC

Re: Review Request 33378: Patch for KAFKA-2136

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/#review86428
-----------------------------------------------------------


Overall, looks good.

General comment on the naming: delay vs throttle. Personally, I prefer throttle - I think that is clearer from the client perspective.

Should probably add a test to verify that the replica fetchers are never throttled. Or this may be more relevant for your other patch.


clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
<https://reviews.apache.org/r/33378/#comment138801>

    How about recordThrottleTime? and replacing quotaDelay with throttle in other places as well?



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/33378/#comment138802>

    Can drop this comment



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/33378/#comment138803>

    May be slightly clearer:
    "Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)"



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
<https://reviews.apache.org/r/33378/#comment138804>

    same



clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
<https://reviews.apache.org/r/33378/#comment138810>

    requests



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33378/#comment138811>

    follow-up



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33378/#comment138813>

    `if (`



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33378/#comment138812>

    ... from a client that send a v0 FetchRequest



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/33378/#comment138814>

    `if (`



core/src/main/scala/kafka/api/ProducerResponse.scala
<https://reviews.apache.org/r/33378/#comment138815>

    We should do this based on the response version as well right?



core/src/main/scala/kafka/api/ProducerResponse.scala
<https://reviews.apache.org/r/33378/#comment138816>

    `if (`



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/33378/#comment138818>

    This is slightly unwieldy. Perhaps we can hold this patch especially since this will be impacted by the main patch (KAFKA-2084)



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/33378/#comment138819>

    similar comment here



core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
<https://reviews.apache.org/r/33378/#comment138823>

    Why do we need this as part of this patch?


- Joel Koshy


On May 12, 2015, 9:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33378/
> -----------------------------------------------------------
> 
> (Updated May 12, 2015, 9:42 p.m.)
> 
> 
> Review request for kafka, Joel Koshy and Jun Rao.
> 
> 
> Bugs: KAFKA-2136
>     https://issues.apache.org/jira/browse/KAFKA-2136
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Changes are
> - protocol changes to the fetch request and response to return the throttle_time_ms to clients
> - New producer/consumer metrics to expose the avg and max delay time for a client
> - Test cases.
> 
> For now the patch will publish a zero delay and return a response
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 
>   core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 
>   core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
>   core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda 
>   core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f 
>   core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 
> 
> Diff: https://reviews.apache.org/r/33378/diff/
> 
> 
> Testing
> -------
> 
> New tests added
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 33378: Patch for KAFKA-2136

Posted by Aditya Auradkar <aa...@linkedin.com>.

> On June 5, 2015, 2:43 a.m., Joel Koshy wrote:
> > Overall, looks good.
> > 
> > General comment on the naming: delay vs throttle. Personally, I prefer throttle - I think that is clearer from the client perspective.
> > 
> > Should probably add a test to verify that the replica fetchers are never throttled. Or this may be more relevant for your other patch.

I've changed the names to throttle everywhere. I'll add a replica fetcher test to my other patch


- Aditya


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/#review86428
-----------------------------------------------------------


On June 9, 2015, 5:10 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33378/
> -----------------------------------------------------------
> 
> (Updated June 9, 2015, 5:10 p.m.)
> 
> 
> Review request for kafka, Joel Koshy and Jun Rao.
> 
> 
> Bugs: KAFKA-2136
>     https://issues.apache.org/jira/browse/KAFKA-2136
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Changes are
> - protocol changes to the fetch request and response to return the throttle_time_ms to clients
> - New producer/consumer metrics to expose the avg and max delay time for a client
> - Test cases.
> - Addressed Joel's comments
> 
> For now the patch will publish a zero delay and return a response
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 07e65d4a54ba4eef5b787eba3e71cbe9f6a920bd 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 
>   core/src/main/scala/kafka/api/FetchRequest.scala 5b38f8554898e54800abd65a7415dd0ac41fd958 
>   core/src/main/scala/kafka/api/FetchResponse.scala 0b6b33ab6f7a732ff1322b6f48abd4c43e0d7215 
>   core/src/main/scala/kafka/api/ProducerRequest.scala c866180d3680da03e48d374415f10220f6ca68c4 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala c16f7edd322709060e54c77eb505c44cbd77a4ec 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
>   core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda 
>   core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd 
>   core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 
> 
> Diff: https://reviews.apache.org/r/33378/diff/
> 
> 
> Testing
> -------
> 
> New tests added
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 33378: Patch for KAFKA-2136

Posted by Aditya Auradkar <aa...@linkedin.com>.

> On June 5, 2015, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/api/FetchResponse.scala, line 143
> > <https://reviews.apache.org/r/33378/diff/5/?file=956942#file956942line143>
> >
> >     follow-up

Can you elaborate?


> On June 5, 2015, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, line 58
> > <https://reviews.apache.org/r/33378/diff/5/?file=956947#file956947line58>
> >
> >     This is slightly unwieldy. Perhaps we can hold this patch especially since this will be impacted by the main patch (KAFKA-2084)

Sure, the plan is to commit this after the main patch. I can make this simpler if I added a class to represent the arguments for the callback but I don't think it will add a great deal of value


> On June 5, 2015, 2:43 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/api/ProducerResponse.scala, line 40
> > <https://reviews.apache.org/r/33378/diff/5/?file=956944#file956944line40>
> >
> >     We should do this based on the response version as well right?

I gather the readFrom is only used on the client side. A client running this code will only send V1 style requests and will always get the throttleTime in return.
This isn't the case for consumers because they will send an old version of the request from the ReplicaFetcherThread (if the intra.cluster.replication.protocol is old).


- Aditya


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33378/#review86428
-----------------------------------------------------------


On May 12, 2015, 9:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33378/
> -----------------------------------------------------------
> 
> (Updated May 12, 2015, 9:42 p.m.)
> 
> 
> Review request for kafka, Joel Koshy and Jun Rao.
> 
> 
> Bugs: KAFKA-2136
>     https://issues.apache.org/jira/browse/KAFKA-2136
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Changes are
> - protocol changes to the fetch request and response to return the throttle_time_ms to clients
> - New producer/consumer metrics to expose the avg and max delay time for a client
> - Test cases.
> 
> For now the patch will publish a zero delay and return a response
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b2db91ca14bbd17fef5ce85839679144fff3f689 
>   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 3dc8b015afd2347a41c9a9dbc02b8e367da5f75f 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 8686d83aa52e435c6adafbe9ff4bd1602281072a 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java eb8951fba48c335095cc43fc3672de1c733e07ff 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java fabeae3083a8ea55cdacbb9568f3847ccd85bab4 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 37ec0b79beafcf5735c386b066eb319fb697eff5 
>   clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 419541011d652becf0cda7a5e62ce813cddb1732 
>   clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java 8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java e3cc1967e407b64cc734548c19e30de700b64ba8 
>   core/src/main/scala/kafka/api/FetchRequest.scala b038c15186c0cbcc65b59479324052498361b717 
>   core/src/main/scala/kafka/api/FetchResponse.scala 75aaf57fb76ec01660d93701a57ae953d877d81c 
>   core/src/main/scala/kafka/api/ProducerRequest.scala 570b2da1d865086f9830aa919a49063abbbe574d 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 5d1fac4cb8943f5bfaa487f8e9d9d2856efbd330 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 31a2639477bf66f9a05d2b9b07794572d7ec393b 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 83fc47417dd7fe4edf030217fa7fd69d99b170b0 
>   core/src/main/scala/kafka/server/DelayedFetch.scala de6cf5bdaa0e70394162febc63b50b55ca0a92db 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 05078b24ef28f2f4e099afa943e43f1d00359fda 
>   core/src/main/scala/kafka/server/KafkaApis.scala 417960dd1ab407ebebad8fdb0e97415db3e91a2f 
>   core/src/main/scala/kafka/server/OffsetManager.scala 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 5717165f2344823fabe8f7cfafae4bb8af2d949a 
>   core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala f3ab3f4ff8eb1aa6b2ab87ba75f72eceb6649620 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 00d59337a99ac135e8689bd1ecd928f7b1423d79 
> 
> Diff: https://reviews.apache.org/r/33378/diff/
> 
> 
> Testing
> -------
> 
> New tests added
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>