You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Guozhang Wang <gu...@linkedin.com> on 2014/06/06 02:07:37 UTC

Re: Review Request 21588: Fix KAFKA-1430, round two

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:07 a.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1430, round two


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

incorporate Jun's last comment


one pass before upload


dummy 2


dummy


testing 8


testing 6


testing 5


testing 4


testing 3


testing 2


testing


dummy


dummy


Dummy


relaying code I


minor clean-up


Fix Another minor issue


minor fix purgatory APIs


Minor fix on HW position and base offset


Some minor debug


hw and log active base offset and position


DelayedFetch request


dummy


dummy


LogReadInfo added


dummy commit: producer request purgatory


Dummy commit: refactoring purgatory phase 1


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430, round two

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/#review44998
-----------------------------------------------------------


Overall, the patch looks pretty good to me. Some comments below.


core/src/main/scala/kafka/api/FetchResponse.scala
<https://reviews.apache.org/r/21588/#comment79629>

    There is no need to specify val explicitly. Case class automatically makes every parameter in the constructor a val.



core/src/main/scala/kafka/api/ProducerResponse.scala
<https://reviews.apache.org/r/21588/#comment79630>

    Ditto as the above.



core/src/main/scala/kafka/log/FileMessageSet.scala
<https://reviews.apache.org/r/21588/#comment79633>

    Should this be a val instead of a function?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79634>

    Does time need to be val?



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79635>

    Hmm, when we get an empty ByteBufferMessage, it may be important to return the corresponding offsetMetadata.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79636>

    This can only happen to regular consumer fetch requests. So, returning an UnknowOffset is ok since it's not going to be used for purgatory checking. We should describe this in the comment.



core/src/main/scala/kafka/log/Log.scala
<https://reviews.apache.org/r/21588/#comment79637>

    Perhaps this can be named as convertToOffsetMetadata()?



core/src/main/scala/kafka/server/KafkaServer.scala
<https://reviews.apache.org/r/21588/#comment79640>

    Could this be done inside KafkaApis?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79642>

    Could we make this a case class? Then equal() doesn't need to be overwritten.



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79641>

    older() and peer() could be named better. How about offsetOnOlderSegment and offsetOnSameSegment?



core/src/main/scala/kafka/server/LogOffsetMetadata.scala
<https://reviews.apache.org/r/21588/#comment79643>

    Maybe postitionDiff()?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/21588/#comment79644>

    Do we need to change to override val?



core/src/main/scala/kafka/server/RequestPurgatory.scala
<https://reviews.apache.org/r/21588/#comment79645>

    We will need to sync on t when doing the check (same as in line 187) to avoid the race condition that can cause two responses to be sent for the same request.


- Jun Rao


On June 6, 2014, 12:41 a.m., Guozhang Wang wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/21588/
> -----------------------------------------------------------
> 
> (Updated June 6, 2014, 12:41 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1430
>     https://issues.apache.org/jira/browse/KAFKA-1430
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> 1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory will try to add the delayed request to each keyed watchers list.
> 
> a). When the watcher is trying to add the delayed request, it first check if it is already satisified, and only add the request if it is not satisfied yet.
> 
> b). If one of the watchers failed to add the request since it is already satisfied, checkAndMaybeWatch() returns immediately.
> 
> c). The purgatory size gauge now is the watcher lists' size plus the delayed queue size.
> 
> 2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) Segment file base offset, c) Relative physical position in segment file.
> 
> Each replica then maintains the log offset metadata for
> 
> a) current HW offset. 
> 
> On leader replica, the metadata includes all three values; on follower replica, the metadata only keeps the message offset (others are just -1).
> 
> When a partition becomes the leader, it will use its HW message offset to construct other two values of the metadata by searching in its logs.
> 
> HW offset will be updated in partition's maybeUpdateLeaderHW function.
> 
> b) current log end offset.
> 
> All replica maintains its own log end offset, which gets updated upon log append.
> 
> The leader replica also maintain other replica's log end offset metadata, which are updated from the follower fetch request.
> 
> 3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of the server-side refactoring. 
> 
> The log.read function now returns the fetch offset metadata along with the message set read.
> 
> 4. The delayed fetch request then maintains for each of its fetching partitions the fetch log offset metadata, which is retrieved from the readMessageSet() call.
> 
> 5. Delayed fetch request's satisfaction criterion now is:
> 
> a). This broker is no longer the leader for ANY of the partitions it tries to fetch
> b). The fetch offset locates not on the fetchable segment of the log
> c). The accumulated bytes from all the fetchable segments exceeds the minimum bytes
> 
> For follower fetch request, the fetchable segment is the log active segment; for consumer fetch request, the fetchable segment is the HW's corresponding segment.
> 
> Checking of Case B/C uses the fetching log offset metadata stored in the delayed fetch.
> 
> 6. Delayed producer request's satisfaction criterion remains the same, as when the ACK specified number of logs replicated the data for each partition.
> 
> 7. The condition when to check if delayed produce/fetch can be unblocked now is:
> 
> Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed produce/fetch.
> 
> Whenever the follower's end log offset moves, unblock delayed produce.
> 
> Whenever the local leader append finishes, unblock delayed (follower) fetch.
> 
> 8. In KafkaApis, when checkAndMaybeWatch returns false for delayed produce/fetch, respond immediately.
> 
> In order to let purgatory to respond either after checkAndMaybeWatch returns false, or some delayed request satisfied/expired, it needs to access the request channel to send the response back; this need to be removed after the refactoring completes.
> 
> 9. Move purgatories and delayed requests and request keys and metrics out of KafkaApis as part of the server-side refactoring.
> 
> The replica manager needs to init with these purgatories after KafkaApis are constructed since for now both needs to access them. This needs to be removed after refactoring completes.
> 
> 10. Related test changes, some other comment/logging minor changes.
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
>   core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
>   core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
>   core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
>   core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
>   core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
>   core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
>   core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
>   core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
>   core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
>   core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
>   core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
>   core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
>   core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
>   core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
>   core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
>   core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
>   core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
>   core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
>   core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 
> 
> Diff: https://reviews.apache.org/r/21588/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Guozhang Wang
> 
>


Re: Review Request 21588: Fix KAFKA-1430: Purgatory Redesign

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated July 11, 2014, 6:07 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1430: Purgatory Redesign


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description
-------

Rebased: Incorporated Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Other minor fixes


Diffs
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1515

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated July 11, 2014, 5:59 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1515


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Rebased: Incorporated Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Other minor fixes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala f2ca8562f833f09d96ec4bd37efcacf69cd84b2e 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430 Round Three

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 10, 2014, 6:25 p.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description
-------

Incorporated Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Other minor fixes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430 Round Three

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 10, 2014, 6:22 p.m.)


Review request for kafka.


Summary (updated)
-----------------

Fix KAFKA-1430 Round Three


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

Incorporated Jun's comments: 1. LogSegment.read() will also return fetch info, even if the corresponding message set is empty; 2. Purgatory checking satisfactory in checkAndMaybeWatch synchronously, and will only return false if this thread successfully set the satisfactory bit to true; 3. Other minor fixes


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 6b7603728ae5217565d68b92dd5349e7c6508f31 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala 6db245c956d2172cde916defdb0749081bf891fd 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 0ec120a4a953114e88c575dd6b583874371a09e3 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430, round two

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:41 a.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory will try to add the delayed request to each keyed watchers list.

a). When the watcher is trying to add the delayed request, it first check if it is already satisified, and only add the request if it is not satisfied yet.

b). If one of the watchers failed to add the request since it is already satisfied, checkAndMaybeWatch() returns immediately.

c). The purgatory size gauge now is the watcher lists' size plus the delayed queue size.

2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) Segment file base offset, c) Relative physical position in segment file.

Each replica then maintains the log offset metadata for

a) current HW offset. 

On leader replica, the metadata includes all three values; on follower replica, the metadata only keeps the message offset (others are just -1).

When a partition becomes the leader, it will use its HW message offset to construct other two values of the metadata by searching in its logs.

HW offset will be updated in partition's maybeUpdateLeaderHW function.

b) current log end offset.

All replica maintains its own log end offset, which gets updated upon log append.

The leader replica also maintain other replica's log end offset metadata, which are updated from the follower fetch request.

3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of the server-side refactoring. 

The log.read function now returns the fetch offset metadata along with the message set read.

4. The delayed fetch request then maintains for each of its fetching partitions the fetch log offset metadata, which is retrieved from the readMessageSet() call.

5. Delayed fetch request's satisfaction criterion now is:

a). This broker is no longer the leader for ANY of the partitions it tries to fetch
b). The fetch offset locates not on the fetchable segment of the log
c). The accumulated bytes from all the fetchable segments exceeds the minimum bytes

For follower fetch request, the fetchable segment is the log active segment; for consumer fetch request, the fetchable segment is the HW's corresponding segment.

Checking of Case B/C uses the fetching log offset metadata stored in the delayed fetch.

6. Delayed producer request's satisfaction criterion remains the same, as when the ACK specified number of logs replicated the data for each partition.

7. The condition when to check if delayed produce/fetch can be unblocked now is:

Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed produce/fetch.

Whenever the follower's end log offset moves, unblock delayed produce.

Whenever the local leader append finishes, unblock delayed (follower) fetch.

8. In KafkaApis, when checkAndMaybeWatch returns false for delayed produce/fetch, respond immediately.

In order to let purgatory to respond either after checkAndMaybeWatch returns false, or some delayed request satisfied/expired, it needs to access the request channel to send the response back; this need to be removed after the refactoring completes.

9. Move purgatories and delayed requests and request keys and metrics out of KafkaApis as part of the server-side refactoring.

The replica manager needs to init with these purgatories after KafkaApis are constructed since for now both needs to access them. This needs to be removed after refactoring completes.

10. Related test changes, some other comment/logging minor changes.


Diffs
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430, round two

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:40 a.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

remove log end position from the append info


add new files


incorporate Jun's last comment


one pass before upload


dummy 2


dummy


testing 8


testing 6


testing 5


testing 4


testing 3


testing 2


testing


dummy


dummy


Dummy


relaying code I


minor clean-up


Fix Another minor issue


minor fix purgatory APIs


Minor fix on HW position and base offset


Some minor debug


hw and log active base offset and position


DelayedFetch request


dummy


dummy


LogReadInfo added


dummy commit: producer request purgatory


Dummy commit: refactoring purgatory phase 1


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430, round two

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:37 a.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

1. change the watch() API to checkAndMaybeWatch(). In that function, purgatory will try to add the delayed request to each keyed watchers list.

a). When the watcher is trying to add the delayed request, it first check if it is already satisified, and only add the request if it is not satisfied yet.

b). If one of the watchers failed to add the request since it is already satisfied, checkAndMaybeWatch() returns immediately.

c). The purgatory size gauge now is the watcher lists' size plus the delayed queue size.

2. Add a LogOffsetMetadata structure, which contains a) Message offset, b) Segment file base offset, c) Relative physical position in segment file.

Each replica then maintains the log offset metadata for

a) current HW offset. 

On leader replica, the metadata includes all three values; on follower replica, the metadata only keeps the message offset (others are just -1).

When a partition becomes the leader, it will use its HW message offset to construct other two values of the metadata by searching in its logs.

HW offset will be updated in partition's maybeUpdateLeaderHW function.

b) current log end offset.

All replica maintains its own log end offset, which gets updated upon log append.

The leader replica also maintain other replica's log end offset metadata, which are updated from the follower fetch request.

3. Move the readMessageSet logic from KafkaApis to ReplicaManager as part of the server-side refactoring. 

The log.read function now returns the fetch offset metadata along with the message set read.

4. The delayed fetch request then maintains for each of its fetching partitions the fetch log offset metadata, which is retrieved from the readMessageSet() call.

5. Delayed fetch request's satisfaction criterion now is:

a). This broker is no longer the leader for ANY of the partitions it tries to fetch
b). The fetch offset locates not on the fetchable segment of the log
c). The accumulated bytes from all the fetchable segments exceeds the minimum bytes

For follower fetch request, the fetchable segment is the log active segment; for consumer fetch request, the fetchable segment is the HW's corresponding segment.

Checking of Case B/C uses the fetching log offset metadata stored in the delayed fetch.

6. Delayed producer request's satisfaction criterion remains the same, as when the ACK specified number of logs replicated the data for each partition.

7. The condition when to check if delayed produce/fetch can be unblocked now is:

Whenever leader's HW moves in partition.maybeUpdateLeaderHW, unblock delayed produce/fetch.

Whenever the follower's end log offset moves, unblock delayed produce.

Whenever the local leader append finishes, unblock delayed (follower) fetch.

8. In KafkaApis, when checkAndMaybeWatch returns false for delayed produce/fetch, respond immediately.

In order to let purgatory to respond either after checkAndMaybeWatch returns false, or some delayed request satisfied/expired, it needs to access the request channel to send the response back; this need to be removed after the refactoring completes.

9. Move purgatories and delayed requests and request keys and metrics out of KafkaApis as part of the server-side refactoring.

The replica manager needs to init with these purgatories after KafkaApis are constructed since for now both needs to access them. This needs to be removed after refactoring completes.

10. Related test changes, some other comment/logging minor changes.


Diffs
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang


Re: Review Request 21588: Fix KAFKA-1430, round two

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/21588/
-----------------------------------------------------------

(Updated June 6, 2014, 12:13 a.m.)


Review request for kafka.


Bugs: KAFKA-1430
    https://issues.apache.org/jira/browse/KAFKA-1430


Repository: kafka


Description (updated)
-------

add new files


incorporate Jun's last comment


one pass before upload


dummy 2


dummy


testing 8


testing 6


testing 5


testing 4


testing 3


testing 2


testing


dummy


dummy


Dummy


relaying code I


minor clean-up


Fix Another minor issue


minor fix purgatory APIs


Minor fix on HW position and base offset


Some minor debug


hw and log active base offset and position


DelayedFetch request


dummy


dummy


LogReadInfo added


dummy commit: producer request purgatory


Dummy commit: refactoring purgatory phase 1


Diffs (updated)
-----

  core/src/main/scala/kafka/api/FetchResponse.scala d117f10f724b09d6deef0df3a138d28fc91aa13a 
  core/src/main/scala/kafka/api/ProducerResponse.scala 5a1d8015379b1f5d9130d9edca89544ee7dd0039 
  core/src/main/scala/kafka/cluster/Partition.scala 518d2df5ae702d8c0937e1f9603fd11a54e24be8 
  core/src/main/scala/kafka/cluster/Replica.scala 5e659b4a5c0256431aecc200a6b914472da9ecf3 
  core/src/main/scala/kafka/log/FileMessageSet.scala b2652ddbe2f857028d5980e29a008b2c614694a3 
  core/src/main/scala/kafka/log/Log.scala b7bc5ffdecba7221b3d2e4bca2b0c703bc74fa12 
  core/src/main/scala/kafka/log/LogCleaner.scala 2faa196a4dc612bc634d5ff5f5f275d09073f13b 
  core/src/main/scala/kafka/log/LogSegment.scala 0d6926ea105a99c9ff2cfc9ea6440f2f2d37bde8 
  core/src/main/scala/kafka/server/AbstractFetcherThread.scala 3b15254f32252cf824d7a292889ac7662d73ada1 
  core/src/main/scala/kafka/server/DelayedFetch.scala PRE-CREATION 
  core/src/main/scala/kafka/server/DelayedProduce.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchDataInfo.scala PRE-CREATION 
  core/src/main/scala/kafka/server/FetchRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/KafkaApis.scala 0b668f230c8556fdf08654ce522a11847d0bf39b 
  core/src/main/scala/kafka/server/KafkaServer.scala c22e51e0412843ec993721ad3230824c0aadd2ba 
  core/src/main/scala/kafka/server/LogOffsetMetadata.scala PRE-CREATION 
  core/src/main/scala/kafka/server/OffsetManager.scala 0e22897cd1c7e45c58a61c3c468883611b19116d 
  core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala PRE-CREATION 
  core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 75ae1e161769a020a102009df416009bd6710f4a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 6a56a772c134dbf1e70c1bfe067223009bfdbac8 
  core/src/main/scala/kafka/server/RequestKey.scala PRE-CREATION 
  core/src/main/scala/kafka/server/RequestPurgatory.scala 3d0ff1e2dbd6a5c3587cffa283db70415af83a7b 
  core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c 
  core/src/test/scala/other/kafka/StressTestLog.scala 8fcd068b248688c40e73117dc119fa84cceb95b3 
  core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 9f04bd38be639cde3e7f402845dbe6ae92e87dc2 
  core/src/test/scala/unit/kafka/log/LogManagerTest.scala d03d4c4ee5c28fb1fbab2af0b84003ec0fac36e3 
  core/src/test/scala/unit/kafka/log/LogTest.scala 1da1393983d4b20330e7c7f374424edd1b26f2a3 
  core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 558a5d6765a2b2c2fd33dc75ed31873a133d12c9 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 2cd3a3faf7be2bbc22a892eec78c6e4c05545e18 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b349fce21e7c33cb5bd9ac80cce0b23c31e87525 
  core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 4f61f8469df99e02d6ce7aad897d10e158cca8fd 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b1c4ce9f66aa86c68f2e6988f67546fa63ed1f9b 

Diff: https://reviews.apache.org/r/21588/diff/


Testing
-------


Thanks,

Guozhang Wang