You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <gu...@linkedin.com> on 2014/08/20 22:55:07 UTC

Re: Review Request 24676: WIP KAFKA-1583

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 20, 2014, 8:55 p.m.)


Review request for kafka.


Summary (updated)
-----------------

WIP KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

TBD


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, line 245
> > <https://reviews.apache.org/r/24676/diff/11/?file=724366#file724366line245>
> >
> >     Maybe use this:
> >     "Recorded replica %d log end offset (LEO)..."
> >     
> >     Also, instead of an explicit [%s,%d] format specifier I think we should start doing the following:
> >     
> >     "%s".format(TopicAndPartition(topic, partition))
> >     
> >     That way we ensure a canonical toString for topic/partition pairs and can change it in one place in the future.
> >     
> >     There are some places where we don't log with this agreed-upon format and it is a bit annoying, so going forward I think we should use the above. Can we add it to the logging improvements wiki?

Updated the logging wiki. We can refer people to it when we make logging format comments moving forward.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, line 259
> > <https://reviews.apache.org/r/24676/diff/11/?file=724366#file724366line259>
> >
> >     Since we still may update the HW shall we rename this to maybeUpdateHWAndExpandIsr

The reason I changed its name is that the original name is a bit misleading that only this function can possibly update HW, instead I add in the comments for each function like expandISR and updateHW about which logic may triggers it.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, line 99
> > <https://reviews.apache.org/r/24676/diff/11/?file=724370#file724370line99>
> >
> >     I'm a bit confused by case C. It can also happen if the delayed fetch happens to straddle a segment roll event; the comment seems a bit misleading/incomplete without that.
> >     
> >     In fact, if it is lagging shouldn't it have been satisfied immediately without having to create a DelayedFetch in the first place?

It could be the case that it is lagging on one partition, but that alone cannot give enough data for the fetch.min.bytes since other partitions are all caught up. I reworded the comments a bit.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 187
> > <https://reviews.apache.org/r/24676/diff/11/?file=724373#file724373line187>
> >
> >     Why is this additional logging necessary?
> >     
> >     KafkaApis currently has catch-all for unhandled exceptions.
> >     
> >     Error codes can be inspected via public access logs if required right?

The exception is already caught in the Replica manager, which does not re-throw but only set the error code. Hence the request log will not record this as an failed request.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 423
> > <https://reviews.apache.org/r/24676/diff/11/?file=724373#file724373line423>
> >
> >     Are these changes intentional?

Yes. According to our logging wiki this should be debug level since they are not server side errors.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 46
> > <https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line46>
> >
> >     Should we rename ReplicaManager to ReplicatedLogManager?

I am going to do all the renaming in a follow-up JIRA.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 261
> > <https://reviews.apache.org/r/24676/diff/11/?file=724373#file724373line261>
> >
> >     I'm not sure how scala treats this under the hood, but it _has_ to hold a reference to request until the callback is executed. i.e., we probably still want to empty the request data.

Good point!


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 120
> > <https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line120>
> >
> >     (for regular consumer fetch)

Actually this is for both consumer / follower fetch


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 265
> > <https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line265>
> >
> >     This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i < numPartitions errors in local append.

I think today we are already responding immediately after a failure in local append, right?


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/utils/DelayedItem.scala, line 23
> > <https://reviews.apache.org/r/24676/diff/11/?file=724378#file724378line23>
> >
> >     We don't really need this class anymore and it can be folded into DelayedRequest right?

I am going to do this in a follow-up JIRA.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
-----------------------------------------------------------


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 23, 2014, 1:53 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporate Joel's comments after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Joel Koshy <jj...@gmail.com>.

> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 120
> > <https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line120>
> >
> >     (for regular consumer fetch)
> 
> Guozhang Wang wrote:
>     Actually this is for both consumer / follower fetch

The follower fetches are not going to be blocked based on hw. i.e., available bytes for follower fetches is computed up to log end offset.


> On Oct. 22, 2014, 1:46 a.m., Joel Koshy wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 265
> > <https://reviews.apache.org/r/24676/diff/11/?file=724376#file724376line265>
> >
> >     This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i < numPartitions errors in local append.
> 
> Guozhang Wang wrote:
>     I think today we are already responding immediately after a failure in local append, right?

Yeah that was my question: from the code above it does not seem to be the case. If so, could you file a jira to fix that?


- Joel


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
-----------------------------------------------------------


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 23, 2014, 1:53 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporate Joel's comments after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57235
-----------------------------------------------------------


Very nicely done.

These are all minor comments - all but one concerning emptying the producer request that should be easily fixable if it is an issue. (It is the top comment)


core/src/main/scala/kafka/api/ProducerRequest.scala
<https://reviews.apache.org/r/24676/#comment98557>

    I have a concern that this may actually be still needed. See comment under handleProducerRequest.sendResponseCallback



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment97784>

    Maybe use this:
    "Recorded replica %d log end offset (LEO)..."
    
    Also, instead of an explicit [%s,%d] format specifier I think we should start doing the following:
    
    "%s".format(TopicAndPartition(topic, partition))
    
    That way we ensure a canonical toString for topic/partition pairs and can change it in one place in the future.
    
    There are some places where we don't log with this agreed-upon format and it is a bit annoying, so going forward I think we should use the above. Can we add it to the logging improvements wiki?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment97788>

    Since we still may update the HW shall we rename this to maybeUpdateHWAndExpandIsr



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/24676/#comment97797>

    Since this contains hw (which is a replication detail) should it really be in the replica manager?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97809>

    How about just calling this responseCallback? It is slightly confusing to see references to callbackOnComplete and onComplete in the same class.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97801>

    The earlier comment was useful. i.e., (in which case we return whatever data is available for the partitions that are currently led by this broker)



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97805>

    I'm a bit confused by case C. It can also happen if the delayed fetch happens to straddle a segment roll event; the comment seems a bit misleading/incomplete without that.
    
    In fact, if it is lagging shouldn't it have been satisfied immediately without having to create a DelayedFetch in the first place?



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment98139>

    Similar comment as in DelayedFetch on naming this.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98165>

    Why is this additional logging necessary?
    
    KafkaApis currently has catch-all for unhandled exceptions.
    
    Error codes can be inspected via public access logs if required right?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98166>

    Same here.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98558>

    I'm not sure how scala treats this under the hood, but it _has_ to hold a reference to request until the callback is executed. i.e., we probably still want to empty the request data.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98180>

    to fetch messages



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment98182>

    Are these changes intentional?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment98184>

    commitStatusView
    
    Also, can we just compute the final status once at the end as opposed to preparing an initial response status and modifying later?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment98194>

    Do you think it would be clearer to name this onAppend or something similar?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment97799>

    Should we rename ReplicaManager to ReplicatedLogManager?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment98372>

    (for regular consumer fetch)



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment98380>

    This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i < numPartitions errors in local append.



core/src/main/scala/kafka/utils/DelayedItem.scala
<https://reviews.apache.org/r/24676/#comment98553>

    We don't really need this class anymore and it can be folded into DelayedRequest right?



core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
<https://reviews.apache.org/r/24676/#comment98554>

    weird comment


- Joel Koshy


On Oct. 17, 2014, 4:56 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2014, 4:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporate Jun's comments round two after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57178
-----------------------------------------------------------


Thanks for the patch. +1 after addressing a couple of more minor comments below.

Also, do you plan to have a followup jira to rename request to operation globally?


core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment97691>

    set the response



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97709>

    Reworded the explanation as follows. Does it look ok?
     * 
     * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once.
     * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either 
     * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed,
     * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls
     * forceComplete(). A subclass of DelayedRequest needs to provide an implementation of both onComplete() and
     * tryComplete().


- Jun Rao


On Oct. 17, 2014, 4:56 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2014, 4:56 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporate Jun's comments round two after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Oct. 31, 2014, 6:08 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, line 236
> > <https://reviews.apache.org/r/24676/diff/13/?file=735940#file735940line236>
> >
> >     Could we get rid of = since this method is supposed to not return any value?

Thanks Jun. I will address this comment in KAFKA-1720


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review59370
-----------------------------------------------------------


On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 28, 2014, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporated Joel's comments round two
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review59370
-----------------------------------------------------------


Just one minor comment. Perhaps we can address it in a future patch.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment100644>

    Could we get rid of = since this method is supposed to not return any value?


- Jun Rao


On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 28, 2014, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporated Joel's comments round two
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review59109
-----------------------------------------------------------

Ship it!


Ship It!

- Joel Koshy


On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 28, 2014, 10:09 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporated Joel's comments round two
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Oct. 28, 2014, 10:09 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incoporated Joel's comments round two


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review58865
-----------------------------------------------------------

Ship it!


Looks good - just minor comments which we can fix on check-in.


core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment100026>

    Is there a benefit in using mapValues (vs map) here since we aren't doing any transformation per se. i.e., we anyway need to materialize these.



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment100018>

    minor comment: since we are returning metadatatoolarge, validateOffsetMetadata is an odd name. Could just rename it to validateOffsetMetadataLength - we can take care of that on check-in


- Joel Koshy


On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 23, 2014, 1:53 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporate Joel's comments after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Oct. 23, 2014, 1:53 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incoporate Joel's comments after rebase


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Oct. 17, 2014, 4:56 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Rebase KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incorporate Jun's comments round two after rebase


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review57147
-----------------------------------------------------------


Thanks for the patch. A few more comments.


core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment97617>

    typo uni



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment97628>

    Instead of saying "return error", it's more accruate to say "set an error in response".



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97627>

    It's not super clear what the relationships for forceComplete, tryComplete, complete, onComplete are and how they should be used together. Perhaps we can add an explanation here?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97626>

    How about we rename this to onComplete()?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97619>

    The description of the return value is not quite right. It should be the same as forceComplete(): return true iff the operation is completed by the caller.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97620>

    can be completed => can be completed by the caller


- Jun Rao


On Oct. 17, 2014, 4:15 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 17, 2014, 4:15 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incoporate Jun's comments after rebase
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Oct. 17, 2014, 4:15 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incoporate Jun's comments after rebase


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 167
> > <https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167>
> >
> >     Should "replica manager" be "offset manager"?

This is "replica manager" actually, when it tries to write the commit message to the local log. I have changed the comment a bit to make it more clear.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review56843
-----------------------------------------------------------


On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 14, 2014, 2:42 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review56843
-----------------------------------------------------------


Thanks for the patch. Looks good to me. I only have some minor comments below.


core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97277>

    typo: is does not



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97280>

    Typo: the new the fetch operation



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97278>

    This seems to be case B.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment97279>

    This seems to be case A.



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment97282>

    This doesn't match the comment. The error code is returned from checkEnoughReplicasReachOffset, not from writing to local log.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment97283>

    Should "replica manager" be "offset manager"?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment97287>

    Perhaps add a comment that flag is for testing purpose only.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment97288>

    Could we comment on the return value?


- Jun Rao


On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Oct. 14, 2014, 2:42 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
>   core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
>   core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
>   core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Rebase KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Oct. 14, 2014, 2:42 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Rebase KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c 
  core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 
  core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 
  core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 
  core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 5, 2014, 9:55 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Rebase on KAFKA-1616 and minor changes for unit tests


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala cf3ed4c8f197d1197658645ccb55df0bce86bdd4 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 5, 2014, 9:08 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

rebase on KAFKA-1616 for checking diff files, please do not review


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala cf3ed4c8f197d1197658645ccb55df0bce86bdd4 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote:
> > Looks good. I only have the following minor comments.

Thanks Jun. If there is no more comments for now I will wait for KAFKA-1616 to be checked in first, and then do the rebase and the class / function renaming (which will make the diff file quite hard to review) as well as comments modification accordingly.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52092
-----------------------------------------------------------


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2014, 8:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments round three
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, line 100
> > <https://reviews.apache.org/r/24676/diff/6/?file=674148#file674148line100>
> >
> >     Should we log topicAndPartition as well?

fetchMetadata includes the fetchPartitionStatus, that includes the mapping of topicAndPartition to fetchPartitionStatus.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52092
-----------------------------------------------------------


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2014, 8:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments round three
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52092
-----------------------------------------------------------


Looks good. I only have the following minor comments.


core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment90828>

    The comment is not very accurate since it applies to regular consumers as well.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment90829>

    Should we log topicAndPartition as well?


- Jun Rao


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2014, 8:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments round three
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 2, 2014, 8:37 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments round three


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
> > Are you including the changes in kafka-1616 too? That would be fine. However, the comments in the other jira also need to be addressed in this patch.

I was not intending to include the changes of KAFKA-1616. The plan is to first check int K-1616, then rebase K-1583 on that. However some of the changes may refect some review comments in K-1616 just for ease of rebasing. I can revert these back if you want.


> On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, line 282
> > <https://reviews.apache.org/r/24676/diff/5/?file=673601#file673601line282>
> >
> >     may now in => may not be in

I think it should be "may now be in"?


> On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 343-349
> > <https://reviews.apache.org/r/24676/diff/5/?file=673612#file673612line343>
> >
> >     Not sure if we need the while loop any more. The comments may also need to be adjusted.

Good point. For case "curr != null && cur.forceComplete() == false" we can just return as well.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52044
-----------------------------------------------------------


On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2014, 8:37 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments round three
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review52044
-----------------------------------------------------------


Are you including the changes in kafka-1616 too? That would be fine. However, the comments in the other jira also need to be addressed in this patch.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment90780>

    may now in => may not be in



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90774>

    Force completing the dealyed operation, if not already compeleted.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90775>

    and will be called exactly once.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90779>

    Not sure if we need the while loop any more. The comments may also need to be adjusted.


- Jun Rao


On Sept. 2, 2014, 1:09 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Sept. 2, 2014, 1:09 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments round two.
> 
> Incorporated Jun's comments.
> 
> 1. I left some cases in Log since they are return values for some of their APIs.
> 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
> 3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".
> 
> Unit test passed, with some other notes:
> 
> 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
> 2. Found and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
> 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
> 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 2, 2014, 1:09 a.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments round two.

Incorporated Jun's comments.

1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".

Unit test passed, with some other notes:

1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
2. Found and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.


Diffs
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Sept. 2, 2014, 1:07 a.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

TBD


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51895
-----------------------------------------------------------



core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
<https://reviews.apache.org/r/24676/#comment90573>

    It would be useful to add a text description in the message string.


- Jun Rao


On Aug. 27, 2014, 5 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Aug. 27, 2014, 5 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments.
> 
> 1. I left some cases in Log since they are return values for some of their APIs.
> 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
> 3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".
> 
> Unit test passed, with some other notes:
> 
> 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
> 2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
> 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
> 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.
> 
> One concern I have now is about the online creation of a new callback function (i.e. the "def" inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance?
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 725
> > <https://reviews.apache.org/r/24676/diff/3-4/?file=666252#file666252line725>
> >
> >     Do we need to add the new parameter? Does it hurt to write the checkpoint file in unit tests?

The reason is that in some unit tests with Mock, checkoutpoint function will try to access Log.dir(), which is not easy to be mocked.


> On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala, lines 171-172
> > <https://reviews.apache.org/r/24676/diff/3-4/?file=666256#file666256line171>
> >
> >     What it be better to return the correct offset and just return empty MessageSet? The equality test can be on the offset.

The LogOffsetMetadata is only for the fetching start offset, which would be 0 for both cases; we need to make sure that the fetched data's end offset does not exceed the limit.


> On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, line 169
> > <https://reviews.apache.org/r/24676/diff/4/?file=670284#file670284line169>
> >
> >     Yes, I think this should be debug level logging.

I will fix on other places for handling requests.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51855
-----------------------------------------------------------


On Aug. 27, 2014, 5 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Aug. 27, 2014, 5 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments.
> 
> 1. I left some cases in Log since they are return values for some of their APIs.
> 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
> 3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".
> 
> Unit test passed, with some other notes:
> 
> 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
> 2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
> 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
> 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.
> 
> One concern I have now is about the online creation of a new callback function (i.e. the "def" inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance?
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51855
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment90483>

    Not sure we need what's in the bracket.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment90486>

    a offset => an offset
    request is used twice.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment90480>

    No need to explain sth that's no longer there.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment90488>

    Do we need to add the new parameter? Does it hurt to write the checkpoint file in unit tests?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90481>

    Could we make this method to return void?



core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
<https://reviews.apache.org/r/24676/#comment90489>

    What it be better to return the correct offset and just return empty MessageSet? The equality test can be on the offset.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment90490>

    Yes, I think this should be debug level logging.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90492>

    need => needs



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment90493>

    a operation => an operation
    
    It would also be good to add that "complete()" will be called exactly once.


- Jun Rao


On Aug. 27, 2014, 5 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Aug. 27, 2014, 5 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporated Jun's comments.
> 
> 1. I left some cases in Log since they are return values for some of their APIs.
> 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
> 3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".
> 
> Unit test passed, with some other notes:
> 
> 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
> 2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
> 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
> 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.
> 
> One concern I have now is about the online creation of a new callback function (i.e. the "def" inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance?
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> Unit tests
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 27, 2014, 5 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments.

1. I left some cases in Log since they are return values for some of their APIs.
2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log.
3. I kept the name of "callbackOnComplete" by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as "callbackOnComplete".

Unit test passed, with some other notes:

1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset.
2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory).
3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API.
4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager.

One concern I have now is about the online creation of a new callback function (i.e. the "def" inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance?


Diffs
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing (updated)
-------

Unit tests


Thanks,

Guozhang Wang


Re: Review Request 24676: Fix KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 27, 2014, 4:44 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1583


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

TBD


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 24676: WIP KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/Log.scala, lines 50-62
> > <https://reviews.apache.org/r/24676/diff/3/?file=666244#file666244line50>
> >
> >     Should these two classes be in ReplicaManager since they are only used there?

I originally put them in ReplicaManager, but finally decided to move them to Log since they are in the return values of the public APIs. Let me know if you have a strong preference.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, line 28
> > <https://reviews.apache.org/r/24676/diff/3/?file=666246#file666246line28>
> >
> >     fetchInfo.offset seems to be redunant wtih startOffsetMetadata.messageOffset. Instead of keeping fetchInfo, could we just keep fetchSize?

I originally also did that; the reason for keeping the fetchInfo as a whole is that later on when re-fetching the data in DelayedFetch, we have to reconstruct this fetchInfo for the readFromLocalLog() call readanyways, so I just kept it along with the delayed fetch object. Let me know if you have a strong preference.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 135-140
> > <https://reviews.apache.org/r/24676/diff/3/?file=666246#file666246line135>
> >
> >     Should we use mapValues()?

I think mapValues can be used here since first the resulted map will be used as pass-in parameters that will not be changed, and the second resulted map will be written to socket without modification also. But we can also enforce us to not using mapValues at all circumstances just to not make potential risks. Let me know what you think.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 213-218
> > <https://reviews.apache.org/r/24676/diff/3/?file=666249#file666249line213>
> >
> >     Do we need the warning log here?

It was originally in ReplicaManager, and I have moved them here. Following the error handling guidances I now think it may not be WARN but just DEBUG. Let me know if you have some thoughts.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 165-173
> > <https://reviews.apache.org/r/24676/diff/3/?file=666249#file666249line165>
> >
> >     Do we need the warning log here?

Ditto below.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 214
> > <https://reviews.apache.org/r/24676/diff/3/?file=666252#file666252line214>
> >
> >     I think it's better to explicitly use the return clause unless this is the last statement in the method. For example, if later someone adds a return value at the end of the method, this value will no longer be returned.

Good point.


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 427-428
> > <https://reviews.apache.org/r/24676/diff/3/?file=666252#file666252line427>
> >
> >     This seems to be an existing problem. However, we need to make sure these stats are recorded only once when sending out the fetch response, not everytime when reading from a local log.

I moved the metrics recording to KafkaApis, this make break the layered architecture a bit though..


> On Aug. 25, 2014, 1:15 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 143-146
> > <https://reviews.apache.org/r/24676/diff/3/?file=666253#file666253line143>
> >
> >     This seems redunant given the check in line 120.

This is actually not redundant: tryComplete() will still return false if the operation is already completed (i.e. the "completed" boolean is already set to true).


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51353
-----------------------------------------------------------


On Aug. 21, 2014, 6:33 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Aug. 21, 2014, 6:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Changes include:
> 
> 1. Remove Fetch/ProduceRequestPurgatory classes and move their checking satisfaction logic into the delayedFetch/Produce.
> 2. The base RequestPurgatory's API change tryCompleteElseWatch and checkAndComplete
> 3. The base DelayedRequest API change complete, expire, tryComplete (which will call complete upon success) and isCompleted.
> 4. Move the updatingReplicaLEO function from ReplicaManager into Partition.
> 5. Move the appendMessageToLocalLog and readMessages from KafkaApis to ReplicaManager.
> 6. OffsetManager still used a nested callback for putting offsets into cache while interacting with ReplicaManager.
> 
> Would like reviews on:
> 
> 2. 1. RequestPurgatory and DelayedRequest API and implementation.
> 2. DelayedFetch/Produce API and implementation.
> 3. ReplicaManager's readMessages/appendMessages API.
> 4. OffsetManager's storeOffsets API.
> 5. KafkaApis.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: WIP KAFKA-1583

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/#review51353
-----------------------------------------------------------


Thanks for the patch. Overall, this is a really nice cleanup patch. The logic is much clear now. Some comments below.

A minor issue is that currently we never remove any watched keys in Purgatory, even when the watch list is already empty. We probably don't want to keep removing and then adding the same watch key. However, if a key hasn't been used for some time. Perhaps it's useful to remove it. We can probably file a followup jira to think this through.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/24676/#comment89613>

    This comment is not very accurate. For example, a follower's LEO change can also cause HW to increase.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/24676/#comment89614>

    Should these two classes be in ReplicaManager since they are only used there?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment89600>

    fetchInfo.offset seems to be redunant wtih startOffsetMetadata.messageOffset. Instead of keeping fetchInfo, could we just keep fetchSize?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/24676/#comment89599>

    Should we use mapValues()?



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment89601>

    "if every partition it produces to is satisfied by one of the following."



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/24676/#comment89602>

    Case B.2 seems to be in line 97-98.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment89608>

    Do we need the warning log here?



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/24676/#comment89609>

    Do we need the warning log here?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment89612>

    Could we call this responseCallback?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment89610>

    Could this just be Map(offsetTopicPartition -> new ByteBufferMessageSet(...))?



core/src/main/scala/kafka/server/OffsetManager.scala
<https://reviews.apache.org/r/24676/#comment89611>

    Could this be just responseStatus(offsetTopicPartition)?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment89603>

    To be consistent, should this be named tryCompleteDelayedFetch?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment89604>

    I think it's better to explicitly use the return clause unless this is the last statement in the method. For example, if later someone adds a return value at the end of the method, this value will no longer be returned.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment89605>

    Could this be merged with the first if test?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment89606>

    Would it be better to rename this to fetchOnlyFromLeader throughout?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/24676/#comment89607>

    This seems to be an existing problem. However, we need to make sure these stats are recorded only once when sending out the fetch response, not everytime when reading from a local log.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89598>

    It would be good to explain which methods typically need to be redefined in the subclasses and which ones are not, and group the methods accordingly.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89596>

    Should we just let expirationReaper call complete() and get rid of expire()?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89582>

    request => operation



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89583>

    request => operation



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89584>

    Edited the comment a bit as below.
    
    Note that a delayed operation can be watched on multiple keys. It is possible that an operation is completed after it has been added to the watch list for some, but not all of the keys. In this case, the operation is considered completed and won't be added to the watch list of the remaining keys. The expiration reaper thread will remove this operation from any watcher list in which the operation exists.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89585>

    This seems redunant given the check in line 120.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89594>

    Would it be better to call this tryCompleteWatched()?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89595>

    Do we need to synchronize on curr?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/24676/#comment89597>

    Would it be better to encapsulate this as a function forceComplete() in DelayedRequest? forceComplete() will set the internal completed flag to false and if successful, will call complete(). Then, we don't need to return curr back and we can just get rid of DelayedRequest.expire(). We can also make the completed flag private.


- Jun Rao


On Aug. 21, 2014, 6:33 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/24676/
> -----------------------------------------------------------
> 
> (Updated Aug. 21, 2014, 6:33 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1583
>     https://issues.apache.org/jira/browse/KAFKA-1583
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Changes include:
> 
> 1. Remove Fetch/ProduceRequestPurgatory classes and move their checking satisfaction logic into the delayedFetch/Produce.
> 2. The base RequestPurgatory's API change tryCompleteElseWatch and checkAndComplete
> 3. The base DelayedRequest API change complete, expire, tryComplete (which will call complete upon success) and isCompleted.
> 4. Move the updatingReplicaLEO function from ReplicaManager into Partition.
> 5. Move the appendMessageToLocalLog and readMessages from KafkaApis to ReplicaManager.
> 6. OffsetManager still used a nested callback for putting offsets into cache while interacting with ReplicaManager.
> 
> Would like reviews on:
> 
> 2. 1. RequestPurgatory and DelayedRequest API and implementation.
> 2. DelayedFetch/Produce API and implementation.
> 3. ReplicaManager's readMessages/appendMessages API.
> 4. OffsetManager's storeOffsets API.
> 5. KafkaApis.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
>   core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
>   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
>   core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
>   core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
>   core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
>   core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
>   core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
>   core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
>   core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
>   core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
>   core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
>   core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 
> 
> Diff: https://reviews.apache.org/r/24676/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 24676: WIP KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 21, 2014, 6:33 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Changes include:

1. Remove Fetch/ProduceRequestPurgatory classes and move their checking satisfaction logic into the delayedFetch/Produce.
2. The base RequestPurgatory's API change tryCompleteElseWatch and checkAndComplete
3. The base DelayedRequest API change complete, expire, tryComplete (which will call complete upon success) and isCompleted.
4. Move the updatingReplicaLEO function from ReplicaManager into Partition.
5. Move the appendMessageToLocalLog and readMessages from KafkaApis to ReplicaManager.
6. OffsetManager still used a nested callback for putting offsets into cache while interacting with ReplicaManager.

Would like reviews on:

2. 1. RequestPurgatory and DelayedRequest API and implementation.
2. DelayedFetch/Produce API and implementation.
3. ReplicaManager's readMessages/appendMessages API.
4. OffsetManager's storeOffsets API.
5. KafkaApis.


Diffs
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 24676: WIP KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 21, 2014, 6:30 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

TBD


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 24676: WIP KAFKA-1583

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/24676/
-----------------------------------------------------------

(Updated Aug. 20, 2014, 9 p.m.)


Review request for kafka.


Bugs: KAFKA-1583
    https://issues.apache.org/jira/browse/KAFKA-1583


Repository: kafka


Description (updated)
-------

Compilable WIP patch.

Changes include:

1. Remove Fetch/ProduceRequestPurgatory classes and move their checking satisfaction logic into the delayedFetch/Produce.
2. The base RequestPurgatory's API change tryCompleteElseWatch and checkAndComplete
3. The base DelayedRequest API change complete, expire, tryComplete (which will call complete upon success) and isCompleted.
4. Move the updatingReplicaLEO function from ReplicaManager into Partition.
5. Move the appendMessageToLocalLog and readMessages from KafkaApis to ReplicaManager.
6. OffsetManager still used a nested callback for putting offsets into cache while interacting with ReplicaManager.

Would like reviews on

1. RequestPurgatory and DelayedRequest API and implementation.
2. DelayedFetch/Produce API and implementation.
3. ReplicaManager's readMessages/appendMessages API.
4. OffsetManager's storeOffsets API.
5. KafkaApis.


Diffs
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 
  core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd 
  core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 
  core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe 
  core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 
  core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 
  core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b 
  core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b 
  core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 
  core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 
  core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb 
  core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f 
  core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 

Diff: https://reviews.apache.org/r/24676/diff/


Testing
-------


Thanks,

Guozhang Wang