You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/02/22 18:26:34 UTC

[kafka] branch 2.8 updated: MINOR: Raft request thread should discover api versions (#10157)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new 1677ba1  MINOR: Raft request thread should discover api versions (#10157)
1677ba1 is described below

commit 1677ba1e6cf4b23129decb162f7065b1eb40802b
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Mon Feb 22 10:16:48 2021 -0800

    MINOR: Raft request thread should discover api versions (#10157)
    
    We do not plan to rely on the IBP in order to determine API versions for raft requests. Instead, we want to discover them through the ApiVersions API. This patch enables the flag to do so.
    
    In addition, this patch adds unsupported version as well as authentication version checking to all of the derivatives of `InterBrokerSendThread` which rely on dynamic api version discovery. Test cases for these checks have been added.
    
    Reviewers:  Ismael Juma <is...@juma.me.uk>, Chia-Ping Tsai <ch...@gmail.com>, Boyang Chen <bo...@confluent.io>
---
 .../scala/kafka/raft/KafkaNetworkChannel.scala     | 14 +++-
 core/src/main/scala/kafka/raft/RaftManager.scala   |  2 +-
 .../main/scala/kafka/server/AlterIsrManager.scala  | 14 +++-
 .../server/BrokerToControllerChannelManager.scala  | 18 +++--
 .../scala/kafka/server/ForwardingManager.scala     | 47 ++++++-----
 .../BrokerToControllerRequestThreadTest.scala      | 94 +++++++++++++++++++++-
 .../unit/kafka/raft/KafkaNetworkChannelTest.scala  | 15 +++-
 .../unit/kafka/server/AlterIsrManagerTest.scala    | 42 +++++++---
 .../unit/kafka/server/ForwardingManagerTest.scala  | 42 ++++++++++
 .../MockBrokerToControllerChannelManager.scala     |  4 +-
 10 files changed, 247 insertions(+), 45 deletions(-)

diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index 68f7b4a..d990391 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -124,8 +124,18 @@ class KafkaNetworkChannel(
     }
 
     def onComplete(clientResponse: ClientResponse): Unit = {
-      val response = if (clientResponse.authenticationException != null) {
-        errorResponse(request.data, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      val response = if (clientResponse.versionMismatch != null) {
+        error(s"Request $request failed due to unsupported version error",
+          clientResponse.versionMismatch)
+        errorResponse(request.data, Errors.UNSUPPORTED_VERSION)
+      } else if (clientResponse.authenticationException != null) {
+        // For now we treat authentication errors as retriable. We use the
+        // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+        // Note that `BrokerToControllerChannelManager` will still log the
+        // authentication errors so that users have a chance to fix the problem.
+        error(s"Request $request failed due to authentication error",
+          clientResponse.authenticationException)
+        errorResponse(request.data, Errors.NETWORK_EXCEPTION)
       } else if (clientResponse.wasDisconnected()) {
         errorResponse(request.data, Errors.BROKER_NOT_AVAILABLE)
       } else {
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index 7bf34b0..ecf8934 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -271,7 +271,7 @@ class KafkaRaftManager[T](
     val maxInflightRequestsPerConnection = 1
     val reconnectBackoffMs = 50
     val reconnectBackoffMsMs = 500
-    val discoverBrokerVersions = false
+    val discoverBrokerVersions = true
 
     new NetworkClient(
       selector,
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index b58ca89..9ad734f 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -165,9 +165,19 @@ class DefaultAlterIsrManager(
       new ControllerRequestCompletionHandler {
         override def onComplete(response: ClientResponse): Unit = {
           debug(s"Received AlterIsr response $response")
-          val body = response.responseBody().asInstanceOf[AlterIsrResponse]
           val error = try {
-            handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            if (response.authenticationException != null) {
+              // For now we treat authentication errors as retriable. We use the
+              // `NETWORK_EXCEPTION` error code for lack of a good alternative.
+              // Note that `BrokerToControllerChannelManager` will still log the
+              // authentication errors so that users have a chance to fix the problem.
+              Errors.NETWORK_EXCEPTION
+            } else if (response.versionMismatch != null) {
+              Errors.UNSUPPORTED_VERSION
+            } else {
+              val body = response.responseBody().asInstanceOf[AlterIsrResponse]
+              handleAlterIsrResponse(body, message.brokerEpoch, inflightAlterIsrItems)
+            }
           } finally {
             // clear the flag so future requests can proceed
             clearInFlightRequest()
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 3b53522..621c867 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -327,10 +327,18 @@ class BrokerToControllerRequestThread(
     None
   }
 
-  private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected()) {
+  private[server] def handleResponse(queueItem: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
+    if (response.authenticationException != null) {
+      error(s"Request ${queueItem.request} failed due to authentication error with controller",
+        response.authenticationException)
+      queueItem.callback.onComplete(response)
+    } else if (response.versionMismatch != null) {
+      error(s"Request ${queueItem.request} failed due to unsupported version error",
+        response.versionMismatch)
+      queueItem.callback.onComplete(response)
+    } else if (response.wasDisconnected()) {
       updateControllerAddress(null)
-      requestQueue.putFirst(request)
+      requestQueue.putFirst(queueItem)
     } else if (response.responseBody().errorCounts().containsKey(Errors.NOT_CONTROLLER)) {
       // just close the controller connection and wait for metadata cache update in doWork
       activeControllerAddress().foreach { controllerAddress => {
@@ -338,9 +346,9 @@ class BrokerToControllerRequestThread(
         updateControllerAddress(null)
       }}
 
-      requestQueue.putFirst(request)
+      requestQueue.putFirst(queueItem)
     } else {
-      request.callback.onComplete(response)
+      queueItem.callback.onComplete(response)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala
index b77bd34..a6e22e1 100644
--- a/core/src/main/scala/kafka/server/ForwardingManager.scala
+++ b/core/src/main/scala/kafka/server/ForwardingManager.scala
@@ -106,29 +106,40 @@ class ForwardingManagerImpl(
 
     class ForwardingResponseHandler extends ControllerRequestCompletionHandler {
       override def onComplete(clientResponse: ClientResponse): Unit = {
-        val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
-        val envelopeError = envelopeResponse.error()
         val requestBody = request.body[AbstractRequest]
 
-        // Unsupported version indicates an incompatibility between controller and client API versions. This
-        // could happen when the controller changed after the connection was established. The forwarding broker
-        // should close the connection with the client and let it reinitialize the connection and refresh
-        // the controller API versions.
-        if (envelopeError == Errors.UNSUPPORTED_VERSION) {
-          responseCallback(None)
+        if (clientResponse.versionMismatch != null) {
+          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request $requestBody " +
+            s"due to unexpected version error", clientResponse.versionMismatch)
+          responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
+        } else if (clientResponse.authenticationException != null) {
+          debug(s"Returning `UNKNOWN_SERVER_ERROR` in response to request $requestBody " +
+            s"due to authentication error", clientResponse.authenticationException)
+          responseCallback(Some(requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)))
         } else {
-          val response = if (envelopeError != Errors.NONE) {
-            // A general envelope error indicates broker misconfiguration (e.g. the principal serde
-            // might not be defined on the receiving broker). In this case, we do not return
-            // the error directly to the client since it would not be expected. Instead we
-            // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
-            // on the broker.
-            debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
-            requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+          val envelopeResponse = clientResponse.responseBody.asInstanceOf[EnvelopeResponse]
+          val envelopeError = envelopeResponse.error()
+
+          // Unsupported version indicates an incompatibility between controller and client API versions. This
+          // could happen when the controller changed after the connection was established. The forwarding broker
+          // should close the connection with the client and let it reinitialize the connection and refresh
+          // the controller API versions.
+          if (envelopeError == Errors.UNSUPPORTED_VERSION) {
+            responseCallback(None)
           } else {
-            parseResponse(envelopeResponse.responseData, requestBody, request.header)
+            val response = if (envelopeError != Errors.NONE) {
+              // A general envelope error indicates broker misconfiguration (e.g. the principal serde
+              // might not be defined on the receiving broker). In this case, we do not return
+              // the error directly to the client since it would not be expected. Instead we
+              // return `UNKNOWN_SERVER_ERROR` so that the user knows that there is a problem
+              // on the broker.
+              debug(s"Forwarded request $request failed with an error in the envelope response $envelopeError")
+              requestBody.getErrorResponse(Errors.UNKNOWN_SERVER_ERROR.exception)
+            } else {
+              parseResponse(envelopeResponse.responseData, requestBody, request.header)
+            }
+            responseCallback(Option(response))
           }
-          responseCallback(Option(response))
         }
       }
 
diff --git a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
index f02d4ac..676eb34 100644
--- a/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
+++ b/core/src/test/scala/kafka/server/BrokerToControllerRequestThreadTest.scala
@@ -18,16 +18,16 @@
 package kafka.server
 
 import java.util.Collections
-import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.{ClientResponse, ManualMetadataUpdater, Metadata, MockClient}
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.message.MetadataRequestData
-import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{AbstractRequest, MetadataRequest, MetadataResponse, RequestTestUtils}
 import org.apache.kafka.common.utils.MockTime
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import org.mockito.Mockito._
 
@@ -253,6 +253,94 @@ class BrokerToControllerRequestThreadTest {
     assertTrue(completionHandler.timedOut.get())
   }
 
+  @Test
+  def testUnsupportedVersionHandling(): Unit = {
+    val time = new MockTime()
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val controllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val activeController = new Node(controllerId, "host", 1234)
+
+    when(controllerNodeProvider.get()).thenReturn(Some(activeController))
+
+    val callbackResponse = new AtomicReference[ClientResponse]()
+    val completionHandler = new ControllerRequestCompletionHandler {
+      override def onTimeout(): Unit = fail("Unexpected timeout exception")
+      override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
+    }
+
+    val queueItem = BrokerToControllerQueueItem(
+      time.milliseconds(),
+      new MetadataRequest.Builder(new MetadataRequestData()),
+      completionHandler
+    )
+
+    mockClient.prepareUnsupportedVersionResponse(request => request.apiKey == ApiKeys.METADATA)
+
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
+      config, time, "", retryTimeoutMs = Long.MaxValue)
+
+    testRequestThread.enqueue(queueItem)
+    pollUntil(testRequestThread, () => callbackResponse.get != null)
+    assertNotNull(callbackResponse.get.versionMismatch)
+  }
+
+  @Test
+  def testAuthenticationExceptionHandling(): Unit = {
+    val time = new MockTime()
+    val config = new KafkaConfig(TestUtils.createBrokerConfig(1, "localhost:2181"))
+    val controllerId = 2
+
+    val metadata = mock(classOf[Metadata])
+    val mockClient = new MockClient(time, metadata)
+
+    val controllerNodeProvider = mock(classOf[ControllerNodeProvider])
+    val activeController = new Node(controllerId, "host", 1234)
+
+    when(controllerNodeProvider.get()).thenReturn(Some(activeController))
+
+    val callbackResponse = new AtomicReference[ClientResponse]()
+    val completionHandler = new ControllerRequestCompletionHandler {
+      override def onTimeout(): Unit = fail("Unexpected timeout exception")
+      override def onComplete(response: ClientResponse): Unit = callbackResponse.set(response)
+    }
+
+    val queueItem = BrokerToControllerQueueItem(
+      time.milliseconds(),
+      new MetadataRequest.Builder(new MetadataRequestData()),
+      completionHandler
+    )
+
+    mockClient.createPendingAuthenticationError(activeController, 50)
+
+    val testRequestThread = new BrokerToControllerRequestThread(mockClient, new ManualMetadataUpdater(), controllerNodeProvider,
+      config, time, "", retryTimeoutMs = Long.MaxValue)
+
+    testRequestThread.enqueue(queueItem)
+    pollUntil(testRequestThread, () => callbackResponse.get != null)
+    assertNotNull(callbackResponse.get.authenticationException)
+  }
+
+  private def pollUntil(
+    requestThread: BrokerToControllerRequestThread,
+    condition: () => Boolean,
+    maxRetries: Int = 10
+  ): Unit = {
+    var tries = 0
+    do {
+      requestThread.doWork()
+      tries += 1
+    } while (!condition.apply() && tries < maxRetries)
+
+    if (!condition.apply()) {
+      fail(s"Condition failed to be met after polling $tries times")
+    }
+  }
+
   class TestRequestCompletionHandler(
     expectedResponse: Option[MetadataResponse] = None
   ) extends ControllerRequestCompletionHandler {
diff --git a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
index 0f755b1..41eac22 100644
--- a/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/KafkaNetworkChannelTest.scala
@@ -117,7 +117,7 @@ class KafkaNetworkChannelTest {
 
     for (apiKey <- RaftApis) {
       client.createPendingAuthenticationError(destinationNode, 100)
-      sendAndAssertErrorResponse(apiKey, destinationId, Errors.CLUSTER_AUTHORIZATION_FAILED)
+      sendAndAssertErrorResponse(apiKey, destinationId, Errors.NETWORK_EXCEPTION)
 
       // reset to clear backoff time
       client.reset()
@@ -145,6 +145,19 @@ class KafkaNetworkChannelTest {
     }
   }
 
+  @Test
+  def testUnsupportedVersionError(): Unit = {
+    val destinationId = 2
+    val destinationNode = new Node(destinationId, "127.0.0.1", 9092)
+    channel.updateEndpoint(destinationId, new InetAddressSpec(
+      new InetSocketAddress(destinationNode.host, destinationNode.port)))
+
+    for (apiKey <- RaftApis) {
+      client.prepareUnsupportedVersionResponse(request => request.apiKey == apiKey)
+      sendAndAssertErrorResponse(apiKey, destinationId, Errors.UNSUPPORTED_VERSION)
+    }
+  }
+
   private def sendTestRequest(
     apiKey: ApiKeys,
     destinationId: Int,
diff --git a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
index f0ae4b5..1074fd3 100644
--- a/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AlterIsrManagerTest.scala
@@ -19,11 +19,13 @@ package kafka.server
 
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicInteger
+
 import kafka.api.LeaderAndIsr
 import kafka.utils.{MockScheduler, MockTime}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.{AuthenticationException, UnsupportedVersionException}
 import org.apache.kafka.common.message.AlterIsrResponseData
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -127,20 +129,39 @@ class AlterIsrManagerTest {
 
   @Test
   def testAuthorizationFailed(): Unit = {
-    checkTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+    testRetryOnTopLevelError(Errors.CLUSTER_AUTHORIZATION_FAILED)
   }
 
   @Test
   def testStaleBrokerEpoch(): Unit = {
-    checkTopLevelError(Errors.STALE_BROKER_EPOCH)
+    testRetryOnTopLevelError(Errors.STALE_BROKER_EPOCH)
   }
 
   @Test
   def testUnknownServer(): Unit = {
-    checkTopLevelError(Errors.UNKNOWN_SERVER_ERROR)
+    testRetryOnTopLevelError(Errors.UNKNOWN_SERVER_ERROR)
+  }
+
+  @Test
+  def testRetryOnAuthenticationFailure(): Unit = {
+    testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+      false, null, new AuthenticationException("authentication failed"), null))
+  }
+
+  @Test
+  def testRetryOnUnsupportedVersionError(): Unit = {
+    testRetryOnErrorResponse(new ClientResponse(null, null, "", 0L, 0L,
+      false, new UnsupportedVersionException("unsupported version"), null, null))
   }
 
-  private def checkTopLevelError(error: Errors): Unit = {
+  private def testRetryOnTopLevelError(error: Errors): Unit = {
+    val alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code))
+    val response = new ClientResponse(null, null, "", 0L, 0L,
+      false, null, null, alterIsrResp)
+    testRetryOnErrorResponse(response)
+  }
+
+  private def testRetryOnErrorResponse(response: ClientResponse): Unit = {
     val leaderAndIsr = new LeaderAndIsr(1, 1, List(1,2,3), 10)
     val isrs = Seq(AlterIsrItem(tp0, leaderAndIsr, _ => { }, 0))
     val callbackCapture = EasyMock.newCapture[ControllerRequestCompletionHandler]()
@@ -156,10 +177,7 @@ class AlterIsrManagerTest {
 
     EasyMock.verify(brokerToController)
 
-    var alterIsrResp = new AlterIsrResponse(new AlterIsrResponseData().setErrorCode(error.code))
-    var resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterIsrResp)
-    callbackCapture.getValue.onComplete(resp)
+    callbackCapture.getValue.onComplete(response)
 
     // Any top-level error, we want to retry, so we don't clear items from the pending map
     assertTrue(alterIsrManager.unsentIsrUpdates.containsKey(tp0))
@@ -173,10 +191,10 @@ class AlterIsrManagerTest {
     scheduler.tick()
 
     // After a successful response, we can submit another AlterIsrItem
-    alterIsrResp = partitionResponse(tp0, Errors.NONE)
-    resp = new ClientResponse(null, null, "", 0L, 0L,
-      false, null, null, alterIsrResp)
-    callbackCapture.getValue.onComplete(resp)
+    val retryAlterIsrResponse = partitionResponse(tp0, Errors.NONE)
+    val retryResponse = new ClientResponse(null, null, "", 0L, 0L,
+      false, null, null, retryAlterIsrResponse)
+    callbackCapture.getValue.onComplete(retryResponse)
 
     EasyMock.verify(brokerToController)
 
diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
index 9240af6..2fefdac 100644
--- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala
@@ -155,6 +155,48 @@ class ForwardingManagerTest {
     assertEquals(Map(Errors.REQUEST_TIMED_OUT -> 1).asJava, alterConfigResponse.errorCounts)
   }
 
+  @Test
+  def testUnsupportedVersionFromNetworkClient(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+    val controllerNode = new Node(0, "host", 1234)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
+
+    client.prepareUnsupportedVersionResponse(req => req.apiKey == requestHeader.apiKey)
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNotNull(response.get)
+
+    val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+    assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, alterConfigResponse.errorCounts)
+  }
+
+  @Test
+  def testFailedAuthentication(): Unit = {
+    val requestCorrelationId = 27
+    val clientPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "client")
+    val (requestHeader, requestBuffer) = buildRequest(testAlterConfigRequest, requestCorrelationId)
+    val request = buildRequest(requestHeader, requestBuffer, clientPrincipal)
+
+    val controllerNode = new Node(0, "host", 1234)
+    Mockito.when(controllerNodeProvider.get()).thenReturn(Some(controllerNode))
+
+    client.createPendingAuthenticationError(controllerNode, 50)
+
+    val response = new AtomicReference[AbstractResponse]()
+    forwardingManager.forwardRequest(request, res => res.foreach(response.set))
+    brokerToController.poll()
+    assertNotNull(response.get)
+
+    val alterConfigResponse = response.get.asInstanceOf[AlterConfigsResponse]
+    assertEquals(Map(Errors.UNKNOWN_SERVER_ERROR -> 1).asJava, alterConfigResponse.errorCounts)
+  }
+
   private def buildRequest(
     body: AbstractRequest,
     correlationId: Int
diff --git a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
index a795b91..febd06f 100644
--- a/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
+++ b/core/src/test/scala/unit/kafka/server/MockBrokerToControllerChannelManager.scala
@@ -53,7 +53,9 @@ class MockBrokerToControllerChannelManager(
   }
 
   private[server] def handleResponse(request: BrokerToControllerQueueItem)(response: ClientResponse): Unit = {
-    if (response.wasDisconnected() || response.responseBody.errorCounts.containsKey(Errors.NOT_CONTROLLER)) {
+    if (response.authenticationException != null || response.versionMismatch != null) {
+      request.callback.onComplete(response)
+    } else if (response.wasDisconnected() || response.responseBody.errorCounts.containsKey(Errors.NOT_CONTROLLER)) {
       unsentQueue.addFirst(request)
     } else {
       request.callback.onComplete(response)