You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Neha Narkhede <ne...@gmail.com> on 2013/12/06 23:49:25 UTC

Review Request 16095: Patch for KAFKA-1170

-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/
-----------------------------------------------------------

Review request for kafka.


Bugs: KAFKA-1170
    https://issues.apache.org/jira/browse/KAFKA-1170


Repository: kafka


Description
-------

Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs


KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR


Diffs
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
  core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 

Diff: https://reviews.apache.org/r/16095/diff/


Testing
-------


Thanks,

Neha Narkhede


Re: Review Request 16095: Patch for KAFKA-1170

Posted by Neha Narkhede <ne...@gmail.com>.

> On Dec. 6, 2013, 11:37 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/ReplicaManager.scala, line 216
> > <https://reviews.apache.org/r/16095/diff/1/?file=395217#file395217line216>
> >
> >     Shall we keep it still as one logging entry so that we know these partitions are done in one batch?

The way to correlate all partitions in the same batch is by correlation id. I separated it mainly because multiple partitions in one statement is very difficult while troubleshooting.


> On Dec. 6, 2013, 11:37 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/controller/ReplicaStateMachine.scala, line 175
> > <https://reviews.apache.org/r/16095/diff/1/?file=395215#file395215line175>
> >
> >     In this function, could we improve the logging info for the else case:
> >     
> >     "Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" to "Cannot remove replica %d from ISR of %s since it is not in the ISR. Leader = %d ; ISR = %s"

Included this change in follow up patch, will wait for more reviews to upload it.


- Neha


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/#review29930
-----------------------------------------------------------


On Dec. 6, 2013, 10:49 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16095/
> -----------------------------------------------------------
> 
> (Updated Dec. 6, 2013, 10:49 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1170
>     https://issues.apache.org/jira/browse/KAFKA-1170
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs
> 
> 
> KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/16095/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 16095: Patch for KAFKA-1170

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/#review29930
-----------------------------------------------------------



core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
<https://reviews.apache.org/r/16095/#comment57404>

    In this function, could we improve the logging info for the else case:
    
    "Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s" to "Cannot remove replica %d from ISR of %s since it is not in the ISR. Leader = %d ; ISR = %s"



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57409>

    Shall we keep it still as one logging entry so that we know these partitions are done in one batch?



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57410>

    Ditto as above



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57411>

    Ditto as above



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57412>

    Same here.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57413>

    Same here.



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57414>

    Same here



core/src/main/scala/kafka/server/ReplicaManager.scala
<https://reviews.apache.org/r/16095/#comment57415>

    Same here


- Guozhang Wang


On Dec. 6, 2013, 10:49 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16095/
> -----------------------------------------------------------
> 
> (Updated Dec. 6, 2013, 10:49 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1170
>     https://issues.apache.org/jira/browse/KAFKA-1170
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs
> 
> 
> KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/16095/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 16095: Patch for KAFKA-1170

Posted by Guozhang Wang <gu...@linkedin.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/#review29963
-----------------------------------------------------------

Ship it!


Ship It!

- Guozhang Wang


On Dec. 7, 2013, 12:22 a.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16095/
> -----------------------------------------------------------
> 
> (Updated Dec. 7, 2013, 12:22 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1170
>     https://issues.apache.org/jira/browse/KAFKA-1170
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Incorporating Jun's and Guozhang's review comments
> 
> 
> Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs
> 
> 
> KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
>   core/src/main/scala/kafka/controller/KafkaController.scala 3beaf75f8285c8b6146aced2fefda4234cf1d307 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/16095/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>


Re: Review Request 16095: Patch for KAFKA-1170

Posted by Neha Narkhede <ne...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/
-----------------------------------------------------------

(Updated Dec. 7, 2013, 12:22 a.m.)


Review request for kafka.


Bugs: KAFKA-1170
    https://issues.apache.org/jira/browse/KAFKA-1170


Repository: kafka


Description (updated)
-------

Incorporating Jun's and Guozhang's review comments


Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs


KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR


Diffs (updated)
-----

  core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
  core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
  core/src/main/scala/kafka/controller/KafkaController.scala 3beaf75f8285c8b6146aced2fefda4234cf1d307 
  core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
  core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
  core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
  core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
  core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
  core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
  core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 

Diff: https://reviews.apache.org/r/16095/diff/


Testing
-------


Thanks,

Neha Narkhede


Re: Review Request 16095: Patch for KAFKA-1170

Posted by Jun Rao <ju...@gmail.com>.
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/16095/#review29935
-----------------------------------------------------------

Ship it!


Just one minor comment below.


core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/16095/#comment57416>

    



core/src/main/scala/kafka/controller/ControllerChannelManager.scala
<https://reviews.apache.org/r/16095/#comment57417>

    



core/src/test/scala/unit/kafka/admin/AdminTest.scala
<https://reviews.apache.org/r/16095/#comment57424>

    Could we write a separate method to share the code?


- Jun Rao


On Dec. 6, 2013, 10:49 p.m., Neha Narkhede wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/16095/
> -----------------------------------------------------------
> 
> (Updated Dec. 6, 2013, 10:49 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1170
>     https://issues.apache.org/jira/browse/KAFKA-1170
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Fixing unit tests that had incorrect value for replicaFetchWaitMaxMs
> 
> 
> KAFKA-1170 ISR can be inconsistent during partition reassignment for low throughput partitions; Changes include 1) Fixing the ISR shrinking logic to remove replicas that haven't sent a fetch request for replica.lag.max.wait.ms. 2) Adding a check for replica.fetch.wait.ms <= replica.lag.max.wait.ms for preventing frequent shrinking and protecting against misconfiguration 3) Fixing the ISR shrink logic on controller to always shrink ISR without doing a check in the local cache since the local cache could be behind when the leader has expanded the ISR 4) Fixed the state change logging to include relevant information useful for troubleshooting 5) Fixed the unit tests to check if post reassignment there are extra replicas in the ISR
> 
> 
> Diffs
> -----
> 
>   core/src/main/scala/kafka/cluster/Partition.scala 02ccc17c79b6d44c75f9bb6ca7cda8c51ae6f6fb 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala beca460dfe0f4df5ccd7f6358e44cbe742d256e5 
>   core/src/main/scala/kafka/controller/ReplicaStateMachine.scala c52225a80ecacead694492fb1525ae60561595a1 
>   core/src/main/scala/kafka/server/KafkaConfig.scala 8f9db105898952a51b797a5c314435f6320c92d7 
>   core/src/main/scala/kafka/server/ReplicaManager.scala f9c7c29bb96a63d6452269f46a12751daca15b62 
>   core/src/test/scala/unit/kafka/admin/AdminTest.scala c30069e837e54fb91bf1d5b75b133282a28dedf8 
>   core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala 702643246939f4f734013c8a8be82f45ec34b67a 
>   core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala 34e39e75e039a14a2426225c28049e72090484df 
>   core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala bab436dcef1645b5e327a5e7e68abdbe57604745 
> 
> Diff: https://reviews.apache.org/r/16095/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Neha Narkhede
> 
>