You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Aditya Auradkar <aa...@linkedin.com> on 2015/03/12 02:36:10 UTC

Review Request 31967: Patch for KAFKA-1546

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description
-------

Patch for KAFKA-1546


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review78018
-----------------------------------------------------------

Ship it!


Thanks for the patch. +1. Just a couple of minor comments below.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment126396>

    LEO => leo



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment126397>

    Stuck replicas => Lagging replicas


- Jun Rao


On March 27, 2015, 12:44 a.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 12:44 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Addressing Jun and Guozhang's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review78166
-----------------------------------------------------------

Ship it!


Ship It!

- Neha Narkhede


On March 27, 2015, 6:58 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 27, 2015, 6:58 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Addressing Jun and Guozhang's comments
> 
> 
> Addressing Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 422451aec5ea0442eb2e4c1ae772885b813904a9 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 27, 2015, 6:58 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Addressing Jun and Guozhang's comments


Addressing Jun's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 422451aec5ea0442eb2e4c1ae772885b813904a9 
  core/src/main/scala/kafka/server/ReplicaManager.scala 44f0026e6676d8d764dd59dbcc6bb7bb727a3ba6 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 27, 2015, 12:44 a.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Addressing Jun and Guozhang's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.

> On March 26, 2015, 12:07 a.m., Jun Rao wrote:
> > core/src/main/scala/kafka/cluster/Replica.scala, lines 52-72
> > <https://reviews.apache.org/r/31967/diff/6/?file=905875#file905875line52>
> >
> >     Would it be simpler to instead keeping track of a lastCaughtUpTime and updating it every time readToEndOfEnd is true? If readToEndOfEnd is false, we will just leave that value untouched.

Good suggestion. I'll update the patch.


- Aditya


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77810
-----------------------------------------------------------


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77810
-----------------------------------------------------------


Thanks for the new patch. A couple of more comments.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment126109>

    I think the condition for both cases can now be combined.
    
    If we maintain a lastCaughtUpTime (see the comment below) in Replica, then a replica r is out of sync if currentTime - r.lastCaughtUpTime > maxLagMs.
    
    We can probably also get rid of Replica.logEndOffsetUpdateTimeMs.



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment126099>

    Would it be simpler to instead keeping track of a lastCaughtUpTime and updating it every time readToEndOfEnd is true? If readToEndOfEnd is false, we will just leave that value untouched.


- Jun Rao


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.

> On March 25, 2015, 9:25 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/cluster/Replica.scala, lines 60-72
> > <https://reviews.apache.org/r/31967/diff/6/?file=905875#file905875line60>
> >
> >     Instead of pass these fiels of logReadResult all the way up to the replica, I think we can just push this comparison logic down to replicaManager.readFromLocalLog around line 507, and just keep a boolean in the logReadResult like isReadFromLogEnd. That value can then be passed all the way up to Replica as sth. like isLEOCaughtUp.
> >     
> >     Also we can change the names of updateReplicaLEO and updateFollowerLEOs as they now carries the isLEOCaughtUp information as well as LEOs.

I initially started with a boolean and later decided to pass the LogOffsetMetadata directly. I agree that it is a bit simpler to pass the boolean value so I'm making these changes.


- Aditya


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77799
-----------------------------------------------------------


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Guozhang Wang <wa...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review77799
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment126078>

    Instead of pass these fiels of logReadResult all the way up to the replica, I think we can just push this comparison logic down to replicaManager.readFromLocalLog around line 507, and just keep a boolean in the logReadResult like isReadFromLogEnd. That value can then be passed all the way up to Replica as sth. like isLEOCaughtUp.
    
    Also we can change the names of updateReplicaLEO and updateFollowerLEOs as they now carries the isLEOCaughtUp information as well as LEOs.


- Guozhang Wang


On March 25, 2015, 8:27 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 25, 2015, 8:27 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Addressing Joel's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 25, 2015, 8:27 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 17, 2015, 9:46 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Addressing Joel's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 46d21c73f1feb3410751899380b35da0c37c975c 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 191251d1340b5e5b2d649b37af3c6c1896d07e6e 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Joel Koshy <jj...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76641
-----------------------------------------------------------


lgtm overall. Minor comments below.


core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
<https://reviews.apache.org/r/31967/#comment124277>

    should be not be -> can you fix/remove?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124224>

    Wonder why this got split. Can you re-optimize imports?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124225>

    Can you move the if statement to the next line



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124243>

    Can we rename the argument to maxLagMs?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124245>

    Minor edit:
    "has not read up to the LEO within the last replicaMaxLag ms, then the follower is lagging and should be removed from the ISR"



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124251>

    (Not part of your change, but could you change [%s,%d] to %s and replace topic, partitionId to TopicAndPartition(topic, partitionId)? We are trying to adopt a uniform convention everywhere in printing topic-partition and have been making these changes gradually (as they appear).



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124252>

    same here



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment124256>

    Can you rename this to lagBeginTimeMsUnderlying?



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment124254>

    read up to the log end offset snapshot when the read was initiated ...



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/31967/#comment124260>

    Can we rename this to logEndOffsetBeforeRead?
    
    Also, can we just do with the Long (offset) instead of the entire LogOffsetMetadata?


- Joel Koshy


On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 16, 2015, 6:32 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76781
-----------------------------------------------------------



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/31967/#comment124449>

    Renamed.
    
    We can use the raw offset directly, but in general the code is passing the LogOffsetMetadata objects and it made sense to remain consistent.


- Aditya Auradkar


On March 16, 2015, 6:32 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 16, 2015, 6:32 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> Updated KAFKA-1546 patch to reflect Neha and Jun's comments
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
>   core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 16, 2015, 6:32 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 16, 2015, 6:31 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion

Updated KAFKA-1546 patch to reflect Neha and Jun's comments


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/FetchDataInfo.scala 26f278f9b75b1c9c83a720ca9ebd8ab812d19d39 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76511
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124067>

    General comment for changes to this file: The lines are longer than 80 in many cases. Can you please fix that?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124073>

    scala.Some is unused.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124074>

    Let's rename FetchDataInfo.fetchOffset to fetchOffsetMetadata. It is confusing to read ...fetchOffset.messageOffset.



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124068>

    Remove space after ! in all relevant places in this patch?



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124075>

    This comment needs an update to match the logic in your patch.



core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment124069>

    Easier to express clearly the case for UnknownLogReadResult. It is harder to read and convince oneself that the case where the broker becomes a leader doesn't set the lagBeginValue to current time. In addition to adding that condition explicitly, can you also add a comment explaining the significance of the check?



core/src/main/scala/kafka/server/KafkaConfig.scala
<https://reviews.apache.org/r/31967/#comment124066>

    minor nit: hasnt => hasn't
    Also, how about "hasn't consumed up to the leader's log end offset for at least this time"



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/31967/#comment124070>

    Please break the long lines. Makes it harder to read and maintain the code



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/31967/#comment124071>

    ditto



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/31967/#comment124072>

    ditto


- Neha Narkhede


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76571
-----------------------------------------------------------


Working on the remaining comments. Shall update the RB


core/src/main/scala/kafka/cluster/Replica.scala
<https://reviews.apache.org/r/31967/#comment124149>

    It isn't required to set the lagBeginValue to the current time because the follower will make a fetch request during that time frame which may or may or may not read from the log end offset which will start the clock. But your suggestion makes it cleaner, so I'll change.


- Aditya Auradkar


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76517
-----------------------------------------------------------


Thanks for the patch. A few comments below.


core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124082>

    This is actualy not a reliable check for isr expansion. Consider a case where there are 3 replicas a, b, and c. Suppose that a and b are in isr and c is not. At some point, c fully catches up to LEO. Its lagBeginTimeMs is set to -1 and we are about to call maybeExpandIsr(). Before that happens, a and b both advance its LEO and HW also advances. Now, we do the check in maybeExpandIsr() and add c to isr. However, c now misses messages btw the LEO it sees and the new HW.
    
    The original check is more reliable for isr expansion.



core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
<https://reviews.apache.org/r/31967/#comment124083>

    The comment says 10, which is inconsistent with the code change below.



core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
<https://reviews.apache.org/r/31967/#comment124084>

    "catch up to only 10" is no longer accurate.


- Jun Rao


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/#review76585
-----------------------------------------------------------



core/src/main/scala/kafka/cluster/Partition.scala
<https://reviews.apache.org/r/31967/#comment124158>

    Interesting point. I thought that it would be enough to simply check the lag value. But yes, this will cause the HW to be inconsistent.


- Aditya Auradkar


On March 12, 2015, 8:42 p.m., Aditya Auradkar wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/31967/
> -----------------------------------------------------------
> 
> (Updated March 12, 2015, 8:42 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1546
>     https://issues.apache.org/jira/browse/KAFKA-1546
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> PATCH for KAFKA-1546
> 
> Brief summary of changes:
> - Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
> - Using lag begin value in the check for ISR expand and shrink
> - Removed the max lag messages config since it is no longer necessary
> - Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
> - Unit test cases to test ISR shrinkage and expansion
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
>   core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
>   core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
>   core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 
> 
> Diff: https://reviews.apache.org/r/31967/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Aditya Auradkar
> 
>


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 12, 2015, 8:42 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 12, 2015, 8:42 p.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 12, 2015, 3:17 a.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546

Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 12, 2015, 1:48 a.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

PATCH for KAFKA-1546


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar


Re: Review Request 31967: Patch for KAFKA-1546

Posted by Aditya Auradkar <aa...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/31967/
-----------------------------------------------------------

(Updated March 12, 2015, 1:39 a.m.)


Review request for kafka.


Bugs: KAFKA-1546
    https://issues.apache.org/jira/browse/KAFKA-1546


Repository: kafka


Description (updated)
-------

Patch for KAFKA-1546
Brief summary of changes:
- Added a lagBegin metric inside Replica to track the lag in terms of time since the replica did not read from the LEO
- Using lag begin value in the check for ISR expand and shrink
- Removed the max lag messages config since it is no longer necessary
- Returning the initialLogEndOffset in LogReadResult corresponding to the the LEO before actually reading from the log.
- Unit test cases to test ISR shrinkage and expansion


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala c4bf48a801007ebe7497077d2018d6dffe1677d4 
  core/src/main/scala/kafka/cluster/Replica.scala bd13c20338ce3d73113224440e858a12814e5adb 
  core/src/main/scala/kafka/log/Log.scala 06b8ecc5d11a1acfbaf3c693c42bf3ce5b2cd86d 
  core/src/main/scala/kafka/server/KafkaConfig.scala 48e33626695ad8a28b0018362ac225f11df94973 
  core/src/main/scala/kafka/server/ReplicaManager.scala c5274822c57bf3c1f9e4135c0bdcaa87ee50ce20 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 92152358c95fa9178d71bd1c079af0a0bd8f1da8 
  core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c124c8df5b5079e5ffbd0c4ea359562a66aaf317 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 92d6b2c672f74cdd526f2e98da8f7fb3696a88e3 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala efb457334bd87e4c5b0dd2c66ae1995993cd0bc1 

Diff: https://reviews.apache.org/r/31967/diff/


Testing
-------


Thanks,

Aditya Auradkar