You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/04 18:16:08 UTC

[kafka] branch trunk updated: MINOR: Fix concurrency bug in MetadataCache and Metadata request when listeners inconsistent (#4374)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3ea2d91  MINOR: Fix concurrency bug in MetadataCache and Metadata request when listeners inconsistent (#4374)
3ea2d91 is described below

commit 3ea2d912c82a021bf43ba7a26b9b714c2fc0e817
Author: Ismael Juma <gi...@juma.me.uk>
AuthorDate: Thu Jan 4 18:16:04 2018 +0000

    MINOR: Fix concurrency bug in MetadataCache and Metadata request when listeners inconsistent (#4374)
    
    - Add missing locking/volatile in MetadataCache.aliveEndPoint
    - Fix topic metadata not to throw BrokerNotAvailableException
    when listeners are inconsistent. Add test verifying the fix. As
    part of this fix, renamed Broker methods to follow Map
    convention where the `get` version returns `Option`.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/admin/ConsumerGroupCommand.scala   |  2 +-
 core/src/main/scala/kafka/cluster/Broker.scala     | 15 +++---
 .../controller/ControllerChannelManager.scala      |  6 +--
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  5 +-
 .../main/scala/kafka/server/MetadataCache.scala    | 10 ++--
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../main/scala/kafka/tools/UpdateOffsetsInZK.scala |  2 +-
 .../scala/unit/kafka/admin/AddPartitionsTest.scala |  8 +--
 .../api/RequestResponseSerializationTest.scala     |  2 +-
 .../unit/kafka/cluster/BrokerEndPointTest.scala    | 12 ++---
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 63 ++++++++++++++++++++--
 .../unit/kafka/server/LeaderElectionTest.scala     |  2 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  2 +-
 14 files changed, 96 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index e835510..426dbfd 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -505,7 +505,7 @@ object ConsumerGroupCommand extends Logging {
     private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
       try {
         zkUtils.getBrokerInfo(brokerId)
-          .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
+          .map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
           .map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerGroupCommand"))
           .orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)))
       } catch {
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 425eafc..0106982 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -47,13 +47,16 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
     this(bep.id, bep.host, bep.port, listenerName, protocol)
   }
 
-  def getNode(listenerName: ListenerName): Node = {
-    val endpoint = endPointsMap.getOrElse(listenerName,
-      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
-    new Node(id, endpoint.host, endpoint.port, rack.orNull)
-  }
+  def node(listenerName: ListenerName): Node =
+    getNode(listenerName).getOrElse {
+      throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found " +
+        s"for broker $id")
+    }
+
+  def getNode(listenerName: ListenerName): Option[Node] =
+    endPointsMap.get(listenerName).map(endpoint => new Node(id, endpoint.host, endpoint.port, rack.orNull))
 
-  def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
+  def brokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
     val endpoint = endPointsMap.getOrElse(listenerName,
       throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
     new BrokerEndPoint(id, endpoint.host, endpoint.port)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 1b25f5f..e389821 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -109,7 +109,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
-    val brokerNode = broker.getNode(config.interBrokerListenerName)
+    val brokerNode = broker.node(config.interBrokerListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -408,7 +408,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
         }
         val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
         val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
-          _.getNode(controller.config.interBrokerListenerName)
+          _.node(controller.config.interBrokerListenerName)
         }
         val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
           controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
@@ -436,7 +436,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
           controllerContext.liveOrShuttingDownBrokers.map { broker =>
             val securityProtocol = SecurityProtocol.PLAINTEXT
             val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-            val node = broker.getNode(listenerName)
+            val node = broker.node(listenerName)
             val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
             new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
           }
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f52a720..758a305 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -975,7 +975,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     sendResponseMaybeThrottle(request, requestThrottleMs =>
       new MetadataResponse(
         requestThrottleMs,
-        brokers.map(_.getNode(request.context.listenerName)).asJava,
+        brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
         clusterId,
         metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
         completeTopicMetadata.asJava
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d073584..df06c1a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -374,10 +374,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
    */
   private def controlledShutdown() {
 
-    def node(broker: Broker): Node = {
-      val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
-      new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
-    }
+    def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
 
     val socketTimeoutMs = config.controllerSocketTimeoutMs
 
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index fd11bfa..b4a015d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
 class MetadataCache(brokerId: Int) extends Logging {
 
   private val cache = mutable.Map[String, mutable.Map[Int, UpdateMetadataRequest.PartitionState]]()
-  private var controllerId: Option[Int] = None
+  @volatile private var controllerId: Option[Int] = None
   private val aliveBrokers = mutable.Map[Int, Broker]()
   private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
   private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -104,9 +104,11 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
-    aliveNodes.get(brokerId).map { nodeMap =>
-      nodeMap.getOrElse(listenerName,
-        throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
+    inReadLock(partitionMetadataLock) {
+      aliveNodes.get(brokerId).map { nodeMap =>
+        nodeMap.getOrElse(listenerName,
+          throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
+      }
     }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 593c876..f4bfe39 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1304,7 +1304,7 @@ class ReplicaManager(val config: KafkaConfig,
         // we do not need to check if the leader exists again since this has been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
           partition.topicPartition -> BrokerAndInitialOffset(
-            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
+            metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
             partition.getReplica().get.highWatermark.messageOffset)).toMap
         replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 20f1db2..204f5c1 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -70,7 +70,7 @@ object UpdateOffsetsInZK extends Logging {
 
       zkUtils.getBrokerInfo(broker) match {
         case Some(brokerInfo) =>
-          val brokerEndPoint = brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+          val brokerEndPoint = brokerInfo.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
           val consumer = new SimpleConsumer(brokerEndPoint.host, brokerEndPoint.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
           val topicAndPartition = TopicAndPartition(topic, partition)
           val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index b68379b..43d8ec8 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -109,7 +109,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
     val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
-    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(listenerName)),
+    val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.brokerEndPoint(listenerName)),
       "AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
     val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
     val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
@@ -138,7 +138,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-      brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
       "AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata
     val metaDataForTopic2 = metadata.filter(_.topic == topic2)
     val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
@@ -164,7 +164,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
 
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
-      brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
       "AddPartitionsTest-testReplicaPlacementAllServers", 2000, 0).topicsMetadata
 
     val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
@@ -187,7 +187,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
     TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
 
     val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
-      brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+      brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
       "AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 0).topicsMetadata
 
     val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 026786e..4bcf61d 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -158,7 +158,7 @@ object SerializationTestUtils {
 
   def createConsumerMetadataResponse: GroupCoordinatorResponse = {
     GroupCoordinatorResponse(Some(
-      brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0)
+      brokers.head.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0)
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index c60a7ed..380b5ad 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -59,7 +59,7 @@ class BrokerEndPointTest {
     }"""
     val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals("localhost", brokerEndPoint.host)
     assertEquals(9093, brokerEndPoint.port)
   }
@@ -76,7 +76,7 @@ class BrokerEndPointTest {
     }"""
     val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
     assertEquals("localhost", brokerEndPoint.host)
     assertEquals(9092, brokerEndPoint.port)
   }
@@ -86,7 +86,7 @@ class BrokerEndPointTest {
     val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
     val broker = parseBrokerJson(1, brokerInfoStr)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+    val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
     assertEquals("172.16.8.243", brokerEndPoint.host)
     assertEquals(9091, brokerEndPoint.port)
   }
@@ -104,7 +104,7 @@ class BrokerEndPointTest {
     }"""
     val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+    val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals("host1", brokerEndPoint.host)
     assertEquals(9093, brokerEndPoint.port)
     assertEquals(Some("dc1"), broker.rack)
@@ -124,7 +124,7 @@ class BrokerEndPointTest {
     }"""
     val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+    val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
     assertEquals("host1", brokerEndPoint.host)
     assertEquals(9092, brokerEndPoint.port)
     assertEquals(None, broker.rack)
@@ -143,7 +143,7 @@ class BrokerEndPointTest {
     }"""
     val broker = parseBrokerJson(1, json)
     assertEquals(1, broker.id)
-    val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+    val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
     assertEquals("host1", brokerEndPoint.host)
     assertEquals(9092, brokerEndPoint.port)
     assertEquals(None, broker.rack)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fd6073d..a85a10b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -61,9 +61,9 @@ class KafkaApisTest {
   private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
   private val controller = EasyMock.createNiceMock(classOf[KafkaController])
   private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
-  private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
   private val metrics = new Metrics()
   private val brokerId = 1
+  private val metadataCache = new MetadataCache(brokerId)
   private val authorizer: Option[Authorizer] = None
   private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
   private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
@@ -363,6 +363,61 @@ class KafkaApisTest {
     testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
   }
 
+  /**
+   * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has
+   * more listeners than another) and the request is sent on the listener that exists in both brokers.
+   */
+  @Test
+  def testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers(): Unit = {
+    val (plaintextListener, _) = updateMetadataCacheWithInconsistentListeners()
+    val response = sendMetadataRequestWithInconsistentListeners(plaintextListener)
+    assertEquals(Set(0, 1), response.brokers.asScala.map(_.id).toSet)
+  }
+
+  /*
+   * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has
+   * more listeners than another) and the request is sent on the listener that exists in one broker.
+   */
+  @Test
+  def testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers(): Unit = {
+    val (_, anotherListener) = updateMetadataCacheWithInconsistentListeners()
+    val response = sendMetadataRequestWithInconsistentListeners(anotherListener)
+    assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
+  }
+
+  /**
+   * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
+   */
+  private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = {
+    import UpdateMetadataRequest.{Broker => UBroker}
+    import UpdateMetadataRequest.{EndPoint => UEndPoint}
+    val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+    val anotherListener = new ListenerName("LISTENER2")
+    val brokers = Set(
+      new UBroker(0, Seq(new UEndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener),
+        new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"),
+      new UBroker(1, Seq(new UEndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
+        "rack")
+    )
+    val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+      0, Map.empty[TopicPartition, UpdateMetadataRequest.PartitionState].asJava, brokers.asJava).build()
+    metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+    (plaintextListener, anotherListener)
+  }
+
+  private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
+    val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+    val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+    expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+    EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+    val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener)
+    createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
+
+    readResponse(ApiKeys.METADATA, metadataRequest, capturedResponse).asInstanceOf[MetadataResponse]
+  }
+
   private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = {
     val tp = new TopicPartition("foo", 0)
     val latestOffset = 15L
@@ -400,14 +455,16 @@ class KafkaApisTest {
     buildRequest(requestBuilder)
   }
 
-  private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
+  private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+      listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
 
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
-      new ListenerName(""), SecurityProtocol.PLAINTEXT)
+      listenerName, SecurityProtocol.PLAINTEXT)
     (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0,
       MemoryPool.NONE, buffer, requestChannelMetrics))
   }
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 9cbc260..6768b84 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -134,7 +134,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
     val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", TestUtils.boundPort(s), listenerName,
       securityProtocol))
-    val nodes = brokers.map(_.getNode(listenerName))
+    val nodes = brokers.map(_.node(listenerName))
 
     val controllerContext = new ControllerContext
     controllerContext.liveBrokers = brokers.toSet
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 7781801..dc6ff9e 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -214,7 +214,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
   }
 
   private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = {
-    val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.getBrokerEndPoint(from.config.interBrokerListenerName)
+    val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.brokerEndPoint(from.config.interBrokerListenerName)
     new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext())
   }
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].