You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/10 01:40:13 UTC

[GitHub] [kafka] hachikuji opened a new pull request, #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

hachikuji opened a new pull request, #12499:
URL: https://github.com/apache/kafka/pull/12499

   It is possible currently for a leader to send an `AlterPartition` request to a stale controller which does not have the latest leader epoch discovered through a `LeaderAndIsr` request. In this case, the stale controller returns `FENCED_LEADER_EPOCH`, which causes the partition leader to get stuck. This is a change in behavior following https://github.com/apache/kafka/pull/12032. Prior to that patch, the request would either be accepted (potentially incorrectly) if the `LeaderAndIsr` state matched that on the controller, or it would have returned `NOT_CONTROLLER`. 
   
   This patch fixes the problem by ensuring that `AlterPartition` is sent to a controller with an epoch which is at least as large as that of the controller which sent the `LeaderAndIsr` request. This ensures that the `FENCED_LEADER_EPOCH` error from the controller can be trusted.
   
   A more elegant solution to this problem would probably be to include the controller epoch in the `AlterPartition` request, but this would require a version bump. Alternatively, we considered letting the controller return `UNKNOWN_LEADER_EPOCH` instead of `FENCED_LEADER_EPOCH` when the epoch is larger than what it has in its context. This too likely would require a version bump. Finally, we considered reverting https://github.com/apache/kafka/pull/12032, which would restore the looser validation logic which allows the controller to accept `AlterPartition` requests with larger leader epochs. We rejected this option because we feel it can lead to correctness violations.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji closed pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
hachikuji closed pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller
URL: https://github.com/apache/kafka/pull/12499


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r941967874


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -364,15 +423,15 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
-    if (activeControllerAddress().isDefined) {
+    if (activeControllerOpt().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
       debug("Controller isn't cached, looking for local metadata changes")
       controllerNodeProvider.get() match {
-        case Some(controllerNode) =>
-          info(s"Recorded new controller, from now on will use broker $controllerNode")
-          updateControllerAddress(controllerNode)
-          metadataUpdater.setNodes(Seq(controllerNode).asJava)
+        case Some(controllerNodeAndEpoch) =>

Review Comment:
   Is this where/how eventually the `LeaderAndIsr` from the new controller change gets applied? 



##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -364,15 +423,15 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
-    if (activeControllerAddress().isDefined) {
+    if (activeControllerOpt().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
       debug("Controller isn't cached, looking for local metadata changes")
       controllerNodeProvider.get() match {
-        case Some(controllerNode) =>
-          info(s"Recorded new controller, from now on will use broker $controllerNode")
-          updateControllerAddress(controllerNode)
-          metadataUpdater.setNodes(Seq(controllerNode).asJava)
+        case Some(controllerNodeAndEpoch) =>

Review Comment:
   Is this where/how eventually the `LeaderAndIsr` from the new controllee gets applied? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12499:
URL: https://github.com/apache/kafka/pull/12499#issuecomment-1212226476

   I am going to close this PR. On the one hand, it does not address the problem for KRaft; on the other, we have thought of a simpler fix for the zk controller, which I will open shortly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on PR #12499:
URL: https://github.com/apache/kafka/pull/12499#issuecomment-1211245795

   > So we prevent sending the request if the epoch is lower? And is it the case, that there is always a controller with an epoch at least as large? Or in some cases would we need to wait/retry until such a controller exists?
   
   @jolshan Yes, that is right. Ensuring some level of monotonicity seems like a good general change even outside the original bug. It is weird to allow the broker to send requests to a controller that it knows for sure is stale, and it makes the system harder to reason about. 
   
   One thing I have been trying to think through is how this bug affects kraft. The kraft controller will also return `FENCED_LEADER_EPOCH` if the leader epoch is higher than what it has in its cache. But does kraft give us a stronger guarantee to work with? I think it could, but at the moment, we do not proactively reset the controller in `BrokerToControllerChannelManager` after we discover a new controller. We only reset it after we receive a `NOT_CONTROLLER` error in a request. So it seems to me that we could still hit the same problem with kraft. As a matter of fact, I think this patch does not fix the kraft problem because we do not propagate the controller epoch down to `Partition` so that it can be used in `AlterPartition` requests. cc @jsancio 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jolshan commented on pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
jolshan commented on PR #12499:
URL: https://github.com/apache/kafka/pull/12499#issuecomment-1210963384

   > This patch fixes the problem by ensuring that AlterPartition is sent to a controller with an epoch which is at least as large as that of the controller which sent the LeaderAndIsr request. This ensures that the FENCED_LEADER_EPOCH error from the controller can be trusted.
   
   So we prevent sending the request if the epoch is lower? And is it the case, that there is always a controller with an epoch at least as large? Or in some cases would we need to wait/retry until such a controller exists?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] hachikuji commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
hachikuji commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r942874759


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -317,21 +373,24 @@ class BrokerToControllerRequestThread(
   override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
     val currentTimeMs = time.milliseconds()
     val requestIter = requestQueue.iterator()
+    val controllerOpt = activeControllerOpt()
+
     while (requestIter.hasNext) {
       val request = requestIter.next
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
       } else {
-        val controllerAddress = activeControllerAddress()
-        if (controllerAddress.isDefined) {
-          requestIter.remove()
-          return Some(RequestAndCompletionHandler(
-            time.milliseconds(),
-            controllerAddress.get,
-            request.request,
-            handleResponse(request)
-          ))
+        controllerOpt.foreach { activeController =>
+          if (activeController.epoch >= request.minControllerEpoch) {

Review Comment:
   Yes, that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r941967874


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -364,15 +423,15 @@ class BrokerToControllerRequestThread(
   }
 
   override def doWork(): Unit = {
-    if (activeControllerAddress().isDefined) {
+    if (activeControllerOpt().isDefined) {
       super.pollOnce(Long.MaxValue)
     } else {
       debug("Controller isn't cached, looking for local metadata changes")
       controllerNodeProvider.get() match {
-        case Some(controllerNode) =>
-          info(s"Recorded new controller, from now on will use broker $controllerNode")
-          updateControllerAddress(controllerNode)
-          metadataUpdater.setNodes(Seq(controllerNode).asJava)
+        case Some(controllerNodeAndEpoch) =>

Review Comment:
   Is this where/how eventually the `LeaderAndIsr` from the new controller gets applied? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] andymg3 commented on a diff in pull request #12499: KAFKA-14154; Ensure AlterPartition not sent to stale controller

Posted by GitBox <gi...@apache.org>.
andymg3 commented on code in PR #12499:
URL: https://github.com/apache/kafka/pull/12499#discussion_r941967046


##########
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala:
##########
@@ -317,21 +373,24 @@ class BrokerToControllerRequestThread(
   override def generateRequests(): Iterable[RequestAndCompletionHandler] = {
     val currentTimeMs = time.milliseconds()
     val requestIter = requestQueue.iterator()
+    val controllerOpt = activeControllerOpt()
+
     while (requestIter.hasNext) {
       val request = requestIter.next
       if (currentTimeMs - request.createdTimeMs >= retryTimeoutMs) {
         requestIter.remove()
         request.callback.onTimeout()
       } else {
-        val controllerAddress = activeControllerAddress()
-        if (controllerAddress.isDefined) {
-          requestIter.remove()
-          return Some(RequestAndCompletionHandler(
-            time.milliseconds(),
-            controllerAddress.get,
-            request.request,
-            handleResponse(request)
-          ))
+        controllerOpt.foreach { activeController =>
+          if (activeController.epoch >= request.minControllerEpoch) {

Review Comment:
   To confirm, this check is done on the broker side right? I guess you sort of allude to this in the PR description that potentially a more ideal solution would be for the controller to do the check server side, but that would require a version bump. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org