You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/21 08:49:05 UTC
[GitHub] [kafka] leonardge opened a new pull request #8524: Avoid starting election for topics where preferred leader is not in s…
leonardge opened a new pull request #8524:
URL: https://github.com/apache/kafka/pull/8524
…ync.
*More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.*
*Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.*
### Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r412014026
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig,
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
+ controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker) &&
Review comment:
I think we should do this check last. We also want to use an option to avoid any potential NPEs. e.g:
```
controllerContext.partitionLeadershipInfo.get(partition).forall(...)
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r415085874
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ isPreferredLeaderInSync(tp)
+ )
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
}
}
}
+ private def isPreferredLeaderInSync(tp: TopicPartition): Boolean = {
Review comment:
Done!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r416186838
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ canPreferredReplicaBeLeader(tp)
+ )
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
}
}
}
+ private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = {
+ val assignment = controllerContext.partitionReplicaAssignment(tp)
+ val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp))
Review comment:
Sure thing!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414467090
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,7 +1068,12 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(
+ controllerContext.partitionReplicaAssignment(tp),
+ controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr,
+ controllerContext.liveBrokerIds.toSet).nonEmpty
Review comment:
Done! And after using the mentioned implementation the code block gets cluttered so I extracted it into a helper method.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414740034
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ isPreferredLeaderInSync(tp)
+ )
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
}
}
}
+ private def isPreferredLeaderInSync(tp: TopicPartition): Boolean = {
Review comment:
Perhaps a more accurate name is canPreferredReplicaBeLeader()?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r412857024
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1066,6 +1066,7 @@ class KafkaController(val config: KafkaConfig,
// do this check only if the broker is live and there are no partitions being reassigned currently
// and preferred replica election is not in progress
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
+ controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr.contains(leaderBroker) &&
Review comment:
Done.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414042238
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,7 +1068,12 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection(
+ controllerContext.partitionReplicaAssignment(tp),
+ controllerContext.partitionLeadershipInfo(tp).leaderAndIsr.isr,
+ controllerContext.liveBrokerIds.toSet).nonEmpty
Review comment:
In Election.leaderForPreferredReplica(), liveReplicas is computed as the following. So, we probably want to be consistent here.
`val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, partition))`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
junrao commented on pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#issuecomment-620110244
ok to test
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r416151605
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ canPreferredReplicaBeLeader(tp)
+ )
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
}
}
}
+ private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = {
+ val assignment = controllerContext.partitionReplicaAssignment(tp)
+ val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp))
Review comment:
We also do `controllerContext.isReplicaOnline(leaderBroker, tp)` in the caller. Do we also need it here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r413436495
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ controllerContext.partitionLeadershipInfo.get(tp).forall(l => l.leaderAndIsr.isr.contains(leaderBroker))
Review comment:
The preferred leader election also checks for live brokers. So, perhaps we could just call PartitionLeaderElectionAlgorithms.preferredReplicaPartitionLeaderElection() here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] stanislavkozlovski commented on issue #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
stanislavkozlovski commented on issue #8524:
URL: https://github.com/apache/kafka/pull/8524#issuecomment-617770854
After discussing online, we figured there isn't an easy way to test this scenario. There's significant work to be done to make KafkaController unit test-able
Good catch on the `testLeaderAndIsrWhenEntireIsrOfflineAndUncleanLeaderElectionEnabled`!
I think this change looks good. Let's wait for @hachikuji or @junrao to take a look.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r416164863
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,12 +1068,23 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ canPreferredReplicaBeLeader(tp)
+ )
onReplicaElection(candidatePartitions.toSet, ElectionType.PREFERRED, AutoTriggered)
}
}
}
+ private def canPreferredReplicaBeLeader(tp: TopicPartition): Boolean = {
+ val assignment = controllerContext.partitionReplicaAssignment(tp)
+ val liveReplicas = assignment.filter(replica => controllerContext.isReplicaOnline(replica, tp))
Review comment:
Good point. We can get rid of the isReplicaOnline() check in the caller. @leonardge : Could you summit a followup minor PR?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [kafka] leonardge commented on a change in pull request #8524: KAFKA-9866: Avoid election for topics where preferred leader is not in ISR
Posted by GitBox <gi...@apache.org>.
leonardge commented on a change in pull request #8524:
URL: https://github.com/apache/kafka/pull/8524#discussion_r414012223
##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1068,7 +1068,9 @@ class KafkaController(val config: KafkaConfig,
val candidatePartitions = topicsNotInPreferredReplica.keys.filter(tp => controllerContext.isReplicaOnline(leaderBroker, tp) &&
controllerContext.partitionsBeingReassigned.isEmpty &&
!topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) &&
- controllerContext.allTopics.contains(tp.topic))
+ controllerContext.allTopics.contains(tp.topic) &&
+ controllerContext.partitionLeadershipInfo.get(tp).forall(l => l.leaderAndIsr.isr.contains(leaderBroker))
Review comment:
Done!
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org