You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <gu...@linkedin.com> on 2014/07/21 21:51:19 UTC

Review Request 23767: Fix KAFKA-1430

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description
-------

Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes


Diffs
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.

> On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/api/FetchResponse.scala, lines 28-29
> > <https://reviews.apache.org/r/23767/diff/5/?file=647101#file647101line28>
> >
> >     Since we expose simple consumer as part of the api, this renaming is actually an api change. It would be good we keep the old api unchanged.

I agree that simple consumer and fetch responses are exposed. But I am not sure if this is exposed as well, let's discuss this offline.


> On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 101-102
> > <https://reviews.apache.org/r/23767/diff/5/?file=647102#file647102line101>
> >
> >     This doesn't look quite right. When creating Replica, we initialize the offset of HW to what's in the checkpoint file. However, the other metadata (position and base offset) in LogOffsetMetadata is not initialized properly. Would it be better to let Replica take LogOffsetMetadata instead of long for HW?

This is actually fine, only leader replica needs the full HW metadata, which is constructed in Partition.makeLeader(). For other replicas just the message offset is sufficient.


> On Aug. 1, 2014, 5:08 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Replica.scala, lines 103-105
> > <https://reviews.apache.org/r/23767/diff/5/?file=647103#file647103line103>
> >
> >     It's possible that HW is out of the range of the log. In this case, convertToOffsetMetadata() will throw an OffsetOutOfRangeException. When this happens, we should probably just set HW to 0 since we are not sure what the HW should be.

Thinking about it, convertToOffsetMetadata() will never return LogOffsetMetadata.UnknownOffsetMetadata before. So we can catch the convertToOffsetMetadata and return UnknownOffsetMetadata, which is the same as setting HW to -1.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review49317
-----------------------------------------------------------


On July 31, 2014, 10:04 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 31, 2014, 10:04 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments round two: 1. I ended up not adding the read lock on Partition.leaderReplicaIfLocal, since not only the delayed fetch but delayed produce request also needs to call this function in checkSatisfied. On the other hand, reading inconsistently in all corner cases should be harmless, detail explanation is in the JIRA comments; 2. I kept the renaming of PartitionData since it is not used in core, this class will be only used by FetchResponse.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 9abf219f0efb1a020db9a6623e1672a1affd5cfc 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review49317
-----------------------------------------------------------



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment86280>

    If we want to satisfy the request immediately in this case, shouldn't we return true here?



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/23767/#comment86281>

    Since we expose simple consumer as part of the api, this renaming is actually an api change. It would be good we keep the old api unchanged.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment86347>

    This doesn't look quite right. When creating Replica, we initialize the offset of HW to what's in the checkpoint file. However, the other metadata (position and base offset) in LogOffsetMetadata is not initialized properly. Would it be better to let Replica take LogOffsetMetadata instead of long for HW?



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/23767/#comment86343>

    It's possible that HW is out of the range of the log. In this case, convertToOffsetMetadata() will throw an OffsetOutOfRangeException. When this happens, we should probably just set HW to 0 since we are not sure what the HW should be.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment86340>

    We need to describe what offset is returned in FetchDataInfo. This will be the offset >= startOffset.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment86344>

    We need to describe what offset is returned. This will be the offset >= startOffset. Also, need to explain what happens if offset is out of range.



core/src/main/scala/kafka/log/LogSegment.scala
<https://reviews.apache.org/r/23767/#comment86339>

    We need to describe what offset is returned in FetchDataInfo. This will be the offset >= startOffset.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment86350>

    If we want to satisfy it immediately, should we return true here?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment86345>

    This is really the offsetDiff. With log compaction, not every offset has a message.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment86351>

    Probably use match here too?



core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
<https://reviews.apache.org/r/23767/#comment86349>

    Is that to address the problem that the first message could be delayed due to the starting overhead in the consumer fetcher threads? Perhaps we can include that in the comment.


- Jun Rao


On July 31, 2014, 10:04 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 31, 2014, 10:04 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments round two: 1. I ended up not adding the read lock on Partition.leaderReplicaIfLocal, since not only the delayed fetch but delayed produce request also needs to call this function in checkSatisfied. On the other hand, reading inconsistently in all corner cases should be harmless, detail explanation is in the JIRA comments; 2. I kept the renaming of PartitionData since it is not used in core, this class will be only used by FetchResponse.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 9abf219f0efb1a020db9a6623e1672a1affd5cfc 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

(Updated Aug. 5, 2014, 9:54 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Address Jun's comments round three: change PartitionData back to FetchResponsePartitionData


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 9abf219f0efb1a020db9a6623e1672a1affd5cfc 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

(Updated July 31, 2014, 10:04 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Address Jun's comments round two: 1. I ended up not adding the read lock on Partition.leaderReplicaIfLocal, since not only the delayed fetch but delayed produce request also needs to call this function in checkSatisfied. On the other hand, reading inconsistently in all corner cases should be harmless, detail explanation is in the JIRA comments; 2. I kept the renaming of PartitionData since it is not used in core, this class will be only used by FetchResponse.


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 9abf219f0efb1a020db9a6623e1672a1affd5cfc 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.

> On July 31, 2014, 4:40 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/api/FetchResponse.scala, lines 28-29
> > <https://reviews.apache.org/r/23767/diff/4/?file=643725#file643725line28>
> >
> >     Do we want to rename this? There are lots of PartitionData and this one is specific to FetchResponse.

There is no PartitionData in core, so I renamed it to PartitionData since that will be the only PartitionData, used by FetchResponse.


> On July 31, 2014, 4:40 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedProduce.scala, lines 110-111
> > <https://reviews.apache.org/r/23767/diff/4/?file=643735#file643735line110>
> >
> >     Does this need to be volatile since it can be read/written from different threads?

Good point.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48944
-----------------------------------------------------------


On July 28, 2014, 6:30 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 28, 2014, 6:30 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments: 1. Kept the first comment about removing readlock on leaderReplicaIfLocal for further discussion; Kept the comment on whether satisfying a delayed fetch immediately if on partition has an error for further discussion 3. Rebased on KAFKA-1542 follow-up; 
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/MetadataRequestProducer.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48944
-----------------------------------------------------------



core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/23767/#comment85769>

    Do we want to rename this? There are lots of PartitionData and this one is specific to FetchResponse.



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/23767/#comment86111>

    How about we rename this to convertHWToLocalOffsetMetadata()?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment86124>

    In this case, would it be better to satisfy the request immediately?



core/src/main/scala/kafka/server/DelayedProduce.scala
<https://reviews.apache.org/r/23767/#comment86125>

    Does this need to be volatile since it can be read/written from different threads?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment86129>

    Would it be better to use match here?



core/src/main/scala/kafka/tools/MetadataRequestProducer.scala
<https://reviews.apache.org/r/23767/#comment86132>

    Does this need to be included?


- Jun Rao


On July 28, 2014, 6:30 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 28, 2014, 6:30 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments: 1. Kept the first comment about removing readlock on leaderReplicaIfLocal for further discussion; Kept the comment on whether satisfying a delayed fetch immediately if on partition has an error for further discussion 3. Rebased on KAFKA-1542 follow-up; 
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/MetadataRequestProducer.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

(Updated July 28, 2014, 6:30 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Address Jun's comments: 1. Kept the first comment about removing readlock on leaderReplicaIfLocal for further discussion; Kept the comment on whether satisfying a delayed fetch immediately if on partition has an error for further discussion 3. Rebased on KAFKA-1542 follow-up; 


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/MetadataRequestProducer.scala PRE-CREATION 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 23767: Fix KAFKA-1430

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48809
-----------------------------------------------------------


All new classes such as ProducerRequestPurgatory and FetchRequestPurgatory seem to be missing in v3 of the patch.

- Jun Rao


On July 25, 2014, 4:52 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 25, 2014, 4:52 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

(Updated July 25, 2014, 4:52 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Address Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.

> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120>
> >
> >     We may not be able to remove the readlock here. The issue is that this method accesses not only leaderReplicaIdOpt, but other internal data structures like assignedReplicaMap. Without the lock, the read from the Map could fail even it's being concurrently modified.
> >     
> >     In general, we can get away with the lock only if we want to read a single internal value. Perhaps we can introduce another function isLeaderLocal() that returns a boolean. This method will only need to access leaderReplicaIdOpt. Then all callers will first call leaderReplicaIfLocal and hold onto the leader replica. They can then use isLeaderLocal to see if the leader has changed subsequently.

Would this be the same as what we did now? In getReplica(localBrokerId), if the replica map has changed and the id is no longer in the map, it will return None; if the replica map has changed and the id is no longer the leader, it is just the same as when we callled leaderReplicaIfLocal() to get the leader, and then immediately the leader changed?


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 268-269
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line268>
> >
> >     Instead of using negation, could we do leaderHW.precedes(replica.logEndOffset)?
> >     
> >     Also, could we move && to the previous line?

We can not simply leaderHW.precedes(replica.logEndOffset) since we need to consider ">=", will use messagesDiff instead.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Replica.scala, lines 33-36
> > <https://reviews.apache.org/r/23767/diff/2/?file=637562#file637562line33>
> >
> >     Should we rename highWatermarkValue and logEndOffsetValue to highWatermarkMetadata and logEndOffsetMetadata?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 372-373
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line372>
> >
> >     Move && to previous line?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 324-325
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line324>
> >
> >     Perhaps we could create TopicPartionRequestKey just once?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 296-297
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line296>
> >
> >     This seems to be an existing problem. If ack=-1, a safer check is HW >= requiredOffset. This way, we will be sure that if ISR expands, the acked message is guaranteed to be in the replicas newly added to ISR.
> >     
> >     The following is an example that shows the issue with the existing check. Suppose that all replicas in ISR are at offset 10, but HW is still at 8 and we call checkEnoughReplicasReachOffset on offset 9.  The check will be satisfied and the message is considered committed. We will be updating HW to 10 pretty soon. However, before that happens, another replica whose LEO is at 8 can be added to ISR. This replica won't have message 9, which is acked as committed.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 194-195
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line194>
> >
> >     It's possible that we get an UnknownOffsetMetadata during the conversion. In this case, we probably should set HW to logEndOffset.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/Log.scala, lines 173-174
> > <https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line173>
> >
> >     It would be a bit confusing to reason about the consistency btw nextOffset and nextOffsetMetadata since they are not updated atomically. Could we just keep nextOffsetMetadata?

Good point. Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/log/Log.scala, lines 429-431
> > <https://reviews.apache.org/r/23767/diff/2/?file=637565#file637565line429>
> >
> >     Perhaps we can add a bit details to the comment. So, we are in the situation that the startOffset is in range, but we can't find a single message whose offset is >= startOffset. One possibility seems to be that all messages after startOffset have been deleted due to compaction. Is that the only case? Let's describe all situations when this can happen.

Great catch. I was originally thinking this can only happen when the regular consumer's fetch request is issued with a max offset set to the HW, and all messages beyond start offset is also beyond max offset. But later I realized in this case LogSegment.read() will not actually return null. So the only case will be compaction, and in this case we cannot return unknown offset metadata, but instead we can return log end offset metadata.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 27-38
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line27>
> >
> >     In DelayedProduce, we don't send the response immediately if one partition has an error. Should we do the same thing for DelayedFetch? Will that make the logic simpler?

Sure we can, and it will not simply the logic though. I did this to follow the old we have before which is when a partition is no longer hosting the leader or not known already we return the fetch request immediately with whatever is available. Let me know if you have a strong preference.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 61-63
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line61>
> >
> >     Will this case ever happen? If so, could we add a comment how this can happen?

Comment added.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/FetchRequestPurgatory.scala, lines 41-42
> > <https://reviews.apache.org/r/23767/diff/2/?file=637572#file637572line41>
> >
> >     This can be private.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 448-449
> > <https://reviews.apache.org/r/23767/diff/2/?file=637573#file637573line448>
> >
> >     If ack is >1, it won't be -1. So "but no = -1" is redundant.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 25-26
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line25>
> >
> >     Would it be better to name it offsetOrdering?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 36-37
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line36>
> >
> >     The ordering doesn't match that in the signature.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 41-42
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line41>
> >
> >     Should we just use one constructor with defaults?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/DelayedFetch.scala, lines 40-44
> > <https://reviews.apache.org/r/23767/diff/2/?file=637569#file637569line40>
> >
> >     All those should probably be private.

I cannot since other wise "illegal combination of modifiers: private and override for: value ... ".


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/LogOffsetMetadata.scala, lines 60-61
> > <https://reviews.apache.org/r/23767/diff/2/?file=637574#file637574line60>
> >
> >     Do we need the space after !?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala, lines 44-45
> > <https://reviews.apache.org/r/23767/diff/2/?file=637576#file637576line44>
> >
> >     This can be private.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 262-263
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line262>
> >
> >     TopicAndPartition -> (PartitionData, OffsetMetadta)
> >     
> >     It probably will be clearer if we use a case Class PartitionDataAndOffsetMetadata, instead of using a pair.

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 298-299
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line298>
> >
> >     Perhaps it will be clearer if we return a FetchResponseAndOffsetMetadata instead of a pair?

As above.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 592-593
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line592>
> >
> >     Would updateReplicaLEOAndHW be enough?

It should really be updateReplicaLEOAndPartitionHW.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, lines 603-604
> > <https://reviews.apache.org/r/23767/diff/2/?file=637578#file637578line603>
> >
> >     Do we need to log (topic, partition) twice?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestKey.scala, lines 25-26
> > <https://reviews.apache.org/r/23767/diff/2/?file=637579#file637579line25>
> >
> >     Should we rename this to DelayedRequestKey?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 152-153
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line152>
> >
> >     key doesn't seem to be used.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 48-49
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line48>
> >
> >     These comments may need to be changed according to the comments below.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 93-94
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line93>
> >
> >     I actually think it's more intuitive to return true if the request is satisfied by the caller. Then, we can assign a meaningful return val in the caller.
> >     
> >     val isSatisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(request)
> >     
> >     Also, We need to explain this method better. How about the following comment?
> >     
> >     Potentially add the request for watch on all keys. Return true iff the request is satisfied and the satisfaction is done by the caller.

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 155-156
> > <https://reviews.apache.org/r/23767/diff/2/?file=637580#file637580line155>
> >
> >     Could we name this checkAndMaybeAdd and add the comment?

Ack.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala, lines 88-89
> > <https://reviews.apache.org/r/23767/diff/2/?file=637593#file637593line88>
> >
> >     Can this just be created as Map(a -> b)?

Fixed.


> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 287-288
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line287>
> >
> >     Not sure if we need to copy since inSyncReplicas is immutable. We probably just need to do a reference assignment.

Ah you are right.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48283
-----------------------------------------------------------


On July 21, 2014, 7:53 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 21, 2014, 7:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.

> On July 22, 2014, 5:02 p.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Partition.scala, lines 120-130
> > <https://reviews.apache.org/r/23767/diff/2/?file=637561#file637561line120>
> >
> >     We may not be able to remove the readlock here. The issue is that this method accesses not only leaderReplicaIdOpt, but other internal data structures like assignedReplicaMap. Without the lock, the read from the Map could fail even it's being concurrently modified.
> >     
> >     In general, we can get away with the lock only if we want to read a single internal value. Perhaps we can introduce another function isLeaderLocal() that returns a boolean. This method will only need to access leaderReplicaIdOpt. Then all callers will first call leaderReplicaIfLocal and hold onto the leader replica. They can then use isLeaderLocal to see if the leader has changed subsequently.
> 
> Guozhang Wang wrote:
>     Would this be the same as what we did now? In getReplica(localBrokerId), if the replica map has changed and the id is no longer in the map, it will return None; if the replica map has changed and the id is no longer the leader, it is just the same as when we callled leaderReplicaIfLocal() to get the leader, and then immediately the leader changed?

After thinking about it a third time, I feel this approach may still not work. The reason is keeping the replica objects in delayed producer is not safe: it could be the case that this replica object is already removed from the partition's assign replica map, and hence should be garbage collected if it is not references by the delayed produce request, and then later on a new replica object could be created with the same replica Id. When the delayed produce request is checked, it only check if the replica id matches, which is not sufficient.

On the other hand, it seems harmless to me now having the read lock, what we can have are the following scenarios:

1. check leader replica id;
2. get the replica object from the assigned replica map.

First case: writes before step 1 make it pass, but have not reach the map, in this case the function will return None => this is the same as writes goes after the read.
Second case: writes between step 1 and 2 make reading from the map returns None => this is the same as writes goes before the read.

So I think simply making the map concurrent is good enough, but I strongly agree that we may need a separate JIRA to clean up lock hierarchies after the kafka api refactoring.


- Guozhang


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48283
-----------------------------------------------------------


On July 28, 2014, 6:30 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 28, 2014, 6:30 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Address Jun's comments: 1. Kept the first comment about removing readlock on leaderReplicaIfLocal for further discussion; Kept the comment on whether satisfying a delayed fetch immediately if on partition has an error for further discussion 3. Rebased on KAFKA-1542 follow-up; 
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchRequest.scala 55a5982c5234f3ac10d4b4ea9fd7c4aa11d34154 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala 134aef9c88068443d4d465189f376dd78605b4f8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala f8c1b4e674f7515c377c6c30d212130f1ff022dd 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala afbeffc72e7d7706b44961aecf8519c5c5a3b4b1 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedRequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 897783cb756de548a8b634876f729b63ffe9925e 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/MetadataRequestProducer.scala PRE-CREATION 
>   core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala 7d4c70ce651b1af45cf9bb69b974aa770de8e59d 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala e532c2826c96bfa47d9a3c41b4d71dbf69541eac 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/#review48283
-----------------------------------------------------------


Thanks for the patch. There are some unused imports. Detailed comments below.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84747>

    We may not be able to remove the readlock here. The issue is that this method accesses not only leaderReplicaIdOpt, but other internal data structures like assignedReplicaMap. Without the lock, the read from the Map could fail even it's being concurrently modified.
    
    In general, we can get away with the lock only if we want to read a single internal value. Perhaps we can introduce another function isLeaderLocal() that returns a boolean. This method will only need to access leaderReplicaIdOpt. Then all callers will first call leaderReplicaIfLocal and hold onto the leader replica. They can then use isLeaderLocal to see if the leader has changed subsequently.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84749>

    It's possible that we get an UnknownOffsetMetadata during the conversion. In this case, we probably should set HW to logEndOffset.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84757>

    Instead of using negation, could we do leaderHW.precedes(replica.logEndOffset)?
    
    Also, could we move && to the previous line?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84750>

    Not sure if we need to copy since inSyncReplicas is immutable. We probably just need to do a reference assignment. 



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84812>

    This seems to be an existing problem. If ack=-1, a safer check is HW >= requiredOffset. This way, we will be sure that if ISR expands, the acked message is guaranteed to be in the replicas newly added to ISR.
    
    The following is an example that shows the issue with the existing check. Suppose that all replicas in ISR are at offset 10, but HW is still at 8 and we call checkEnoughReplicasReachOffset on offset 9.  The check will be satisfied and the message is considered committed. We will be updating HW to 10 pretty soon. However, before that happens, another replica whose LEO is at 8 can be added to ISR. This replica won't have message 9, which is acked as committed. 



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84832>

    Perhaps we could create TopicPartionRequestKey just once?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/23767/#comment84833>

    Move && to previous line?



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/23767/#comment84834>

    Should we rename highWatermarkValue and logEndOffsetValue to highWatermarkMetadata and logEndOffsetMetadata?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment84698>

    It would be a bit confusing to reason about the consistency btw nextOffset and nextOffsetMetadata since they are not updated atomically. Could we just keep nextOffsetMetadata?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/23767/#comment84699>

    Perhaps we can add a bit details to the comment. So, we are in the situation that the startOffset is in range, but we can't find a single message whose offset is >= startOffset. One possibility seems to be that all messages after startOffset have been deleted due to compaction. Is that the only case? Let's describe all situations when this can happen. 



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84915>

    In DelayedProduce, we don't send the response immediately if one partition has an error. Should we do the same thing for DelayedFetch? Will that make the logic simpler?



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84836>

    All those should probably be private.



core/src/main/scala/kafka/server/DelayedFetch.scala
<https://reviews.apache.org/r/23767/#comment84913>

    Will this case ever happen? If so, could we add a comment how this can happen?



core/src/main/scala/kafka/server/FetchRequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84746>

    This can be private.



core/src/main/scala/kafka/server/KafkaApis.scala
<https://reviews.apache.org/r/23767/#comment84910>

    If ack is >1, it won't be -1. So "but no = -1" is redundant.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84814>

    Would it be better to name it offsetOrdering?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84687>

    The ordering doesn't match that in the signature.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84919>

    Should we just use one constructor with defaults?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/23767/#comment84689>

    Do we need the space after !?



core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84736>

    This can be private.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84837>

    TopicAndPartition -> (PartitionData, OffsetMetadta)
    
    It probably will be clearer if we use a case Class PartitionDataAndOffsetMetadata, instead of using a pair.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84839>

    Perhaps it will be clearer if we return a FetchResponseAndOffsetMetadata instead of a pair?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84754>

    Would updateReplicaLEOAndHW be enough?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/23767/#comment84912>

    Do we need to log (topic, partition) twice?



core/src/main/scala/kafka/server/RequestKey.scala
<https://reviews.apache.org/r/23767/#comment84920>

    Should we rename this to DelayedRequestKey?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84921>

    These comments may need to be changed according to the comments below.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84718>

    I actually think it's more intuitive to return true if the request is satisfied by the caller. Then, we can assign a meaningful return val in the caller.
    
    val isSatisfiedByMe = producerRequestPurgatory.checkAndMaybeWatch(request)
    
    Also, We need to explain this method better. How about the following comment?
    
    Potentially add the request for watch on all keys. Return true iff the request is satisfied and the satisfaction is done by the caller.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84728>

    key doesn't seem to be used.



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/23767/#comment84709>

    Could we name this checkAndMaybeAdd and add the comment?



core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
<https://reviews.apache.org/r/23767/#comment84922>

    Can this just be created as Map(a -> b)?


- Jun Rao


On July 21, 2014, 7:53 p.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/23767/
> -----------------------------------------------------------
> 
> (Updated July 21, 2014, 7:53 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
>   core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/23767/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 23767: Fix KAFKA-1430

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/23767/
-----------------------------------------------------------

(Updated July 21, 2014, 7:53 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description
-------

Rebased on KAFKA-1462: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Remove the read lock on Partition's reading of the leaderOpt and epoch and making them volatile instead since these two functions are just single read; 4. Fix some minor issues in TestEndToEndLatency; 5. Other minor fixes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/consumer/SimpleConsumer.scala 0e64632210385ef63c2ad3445b55ac4f37a63df2 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala fd5f12ee31e78cdcda8e24a0ab3e1740b3928884 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/tools/TestEndToEndLatency.scala 5f8f6bc46ae6cbbe15cec596ac99d0b377d1d7ef 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/23767/diff/


Testing
-------


Thanks,

Guozhang Wang