You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/20 20:26:49 UTC

[GitHub] [kafka] jsancio opened a new pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

jsancio opened a new pull request #9631:
URL: https://github.com/apache/kafka/pull/9631


   It is possible for the the controller to send LeaderAndIsr requests with
   an ISR that contains ids not in the replica set. This is used during
   reassignment so that the partition leader doesn't add replicas back to
   the ISR. This is needed because the controller updates ZK and the
   replicas through two rounds:
   
   1. The first round of ZK updates and LeaderAndIsr requests shrinks the ISR.
   
   2. The second round of ZK updates and LeaderAndIsr requests shrinks the replica
   set.
   
   This could be avoided by doing 1. and 2. in one round. Unfortunately the
   current controller implementation makes that non-trivial.
   
   This commit changes the leader to allow the state where the ISR contains
   ids that are not in the replica set and to remove such ids from the ISR
   if required.
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #9631:
URL: https://github.com/apache/kafka/pull/9631


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r529105990



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
                                   leaderEndOffset: Long,
                                   currentTimeMs: Long,
                                   maxLagMs: Long): Boolean = {
-    val followerReplica = getReplicaOrException(replicaId)
-    followerReplica.logEndOffset != leaderEndOffset &&
-      (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+    getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
       Thanks for the review!
   
   > This might be ok, but is unnecessary work since the controller will be doing that soon.
   
   According to some users and the report from KAFKA-9672, it looks like under some conditions the controller is writing to ZK that it removed the replica from the assignment but not from the ISR. I am unable to reproduce this or convince myself from the code on how this can happen.
   
   I was thinking of defensively letting the leader also remove the replica from the ISR so that Kafka can recover from this case. If the leader is not allowed to do this then `ack=all` produce messages will continue to fail.
   
   What do you think @junrao?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r532963241



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
                                   leaderEndOffset: Long,
                                   currentTimeMs: Long,
                                   maxLagMs: Long): Boolean = {
-    val followerReplica = getReplicaOrException(replicaId)
-    followerReplica.logEndOffset != leaderEndOffset &&
-      (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+    getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
       @jsancio : Yes, we can keep the logic in the PR. On the leader, the logic for shrinking ISR is checked every 10 secs by default. So, in the common case when completing a reassignment, the reduced ISR by the controller will be propagated to the leader before the leader's ISR shrinking logic kicks in.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio edited a comment on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
jsancio edited a comment on pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265


   @junrao I merged the latest trunk to this PR. Jenkins ran the tests and all the failures look unrelated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jsancio commented on pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
jsancio commented on pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#issuecomment-782469265


   @junrao I merged the latest master to this PR. Jenkins ran the tests and all the failures look unrelated.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9631: KAFKA-9672: Leader with ISR as a superset of replicas

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9631:
URL: https://github.com/apache/kafka/pull/9631#discussion_r529070546



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -947,9 +947,10 @@ class Partition(val topicPartition: TopicPartition,
                                   leaderEndOffset: Long,
                                   currentTimeMs: Long,
                                   maxLagMs: Long): Boolean = {
-    val followerReplica = getReplicaOrException(replicaId)
-    followerReplica.logEndOffset != leaderEndOffset &&
-      (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+    getReplica(replicaId).fold(true) { followerReplica =>

Review comment:
       For the reassignment case, once the controller shrinks the assigned replica set, the next step is for the controller to remove the remove replica from ISR and bump up the leader epoch. The shrunk isr will then be propagated to the leader. With `fold(true)`, we allow to leader to shrink ISR immediately. This might be ok, but is unnecessary work since the controller will be doing that soon.
   
   Another option is to use `fold(false)`. This way, the leader won't shrink the removed replica from ISR. Only the controller will do.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org