You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/01/04 18:16:08 UTC
[kafka] branch trunk updated: MINOR: Fix concurrency bug in
MetadataCache and Metadata request when listeners inconsistent (#4374)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3ea2d91 MINOR: Fix concurrency bug in MetadataCache and Metadata request when listeners inconsistent (#4374)
3ea2d91 is described below
commit 3ea2d912c82a021bf43ba7a26b9b714c2fc0e817
Author: Ismael Juma <gi...@juma.me.uk>
AuthorDate: Thu Jan 4 18:16:04 2018 +0000
MINOR: Fix concurrency bug in MetadataCache and Metadata request when listeners inconsistent (#4374)
- Add missing locking/volatile in MetadataCache.aliveEndPoint
- Fix topic metadata not to throw BrokerNotAvailableException
when listeners are inconsistent. Add test verifying the fix. As
part of this fix, renamed Broker methods to follow Map
convention where the `get` version returns `Option`.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/admin/ConsumerGroupCommand.scala | 2 +-
core/src/main/scala/kafka/cluster/Broker.scala | 15 +++---
.../controller/ControllerChannelManager.scala | 6 +--
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
.../main/scala/kafka/server/MetadataCache.scala | 10 ++--
.../main/scala/kafka/server/ReplicaManager.scala | 2 +-
.../main/scala/kafka/tools/UpdateOffsetsInZK.scala | 2 +-
.../scala/unit/kafka/admin/AddPartitionsTest.scala | 8 +--
.../api/RequestResponseSerializationTest.scala | 2 +-
.../unit/kafka/cluster/BrokerEndPointTest.scala | 12 ++---
.../scala/unit/kafka/server/KafkaApisTest.scala | 63 ++++++++++++++++++++--
.../unit/kafka/server/LeaderElectionTest.scala | 2 +-
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
14 files changed, 96 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index e835510..426dbfd 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -505,7 +505,7 @@ object ConsumerGroupCommand extends Logging {
private def getZkConsumer(brokerId: Int): Option[SimpleConsumer] = {
try {
zkUtils.getBrokerInfo(brokerId)
- .map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
+ .map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)))
.map(endPoint => new SimpleConsumer(endPoint.host, endPoint.port, 10000, 100000, "ConsumerGroupCommand"))
.orElse(throw new BrokerNotAvailableException("Broker id %d does not exist".format(brokerId)))
} catch {
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 425eafc..0106982 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -47,13 +47,16 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String]) {
this(bep.id, bep.host, bep.port, listenerName, protocol)
}
- def getNode(listenerName: ListenerName): Node = {
- val endpoint = endPointsMap.getOrElse(listenerName,
- throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
- new Node(id, endpoint.host, endpoint.port, rack.orNull)
- }
+ def node(listenerName: ListenerName): Node =
+ getNode(listenerName).getOrElse {
+ throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found " +
+ s"for broker $id")
+ }
+
+ def getNode(listenerName: ListenerName): Option[Node] =
+ endPointsMap.get(listenerName).map(endpoint => new Node(id, endpoint.host, endpoint.port, rack.orNull))
- def getBrokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
+ def brokerEndPoint(listenerName: ListenerName): BrokerEndPoint = {
val endpoint = endPointsMap.getOrElse(listenerName,
throw new BrokerEndPointNotAvailableException(s"End point with listener name ${listenerName.value} not found for broker $id"))
new BrokerEndPoint(id, endpoint.host, endpoint.port)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 1b25f5f..e389821 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -109,7 +109,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
- val brokerNode = broker.getNode(config.interBrokerListenerName)
+ val brokerNode = broker.node(config.interBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
@@ -408,7 +408,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
}
val leaderIds = leaderAndIsrPartitionStates.map(_._2.basePartitionState.leader).toSet
val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)).map {
- _.getNode(controller.config.interBrokerListenerName)
+ _.node(controller.config.interBrokerListenerName)
}
val leaderAndIsrRequestBuilder = new LeaderAndIsrRequest.Builder(leaderAndIsrRequestVersion, controllerId,
controllerEpoch, leaderAndIsrPartitionStates.asJava, leaders.asJava)
@@ -436,7 +436,7 @@ class ControllerBrokerRequestBatch(controller: KafkaController, stateChangeLogge
controllerContext.liveOrShuttingDownBrokers.map { broker =>
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val node = broker.getNode(listenerName)
+ val node = broker.node(listenerName)
val endPoints = Seq(new EndPoint(node.host, node.port, securityProtocol, listenerName))
new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index f52a720..758a305 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -975,7 +975,7 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseMaybeThrottle(request, requestThrottleMs =>
new MetadataResponse(
requestThrottleMs,
- brokers.map(_.getNode(request.context.listenerName)).asJava,
+ brokers.flatMap(_.getNode(request.context.listenerName)).asJava,
clusterId,
metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
completeTopicMetadata.asJava
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index d073584..df06c1a 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -374,10 +374,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
*/
private def controlledShutdown() {
- def node(broker: Broker): Node = {
- val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
- new Node(brokerEndPoint.id, brokerEndPoint.host, brokerEndPoint.port)
- }
+ def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
val socketTimeoutMs = config.controllerSocketTimeoutMs
diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala
index fd11bfa..b4a015d 100755
--- a/core/src/main/scala/kafka/server/MetadataCache.scala
+++ b/core/src/main/scala/kafka/server/MetadataCache.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest
class MetadataCache(brokerId: Int) extends Logging {
private val cache = mutable.Map[String, mutable.Map[Int, UpdateMetadataRequest.PartitionState]]()
- private var controllerId: Option[Int] = None
+ @volatile private var controllerId: Option[Int] = None
private val aliveBrokers = mutable.Map[Int, Broker]()
private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()
private val partitionMetadataLock = new ReentrantReadWriteLock()
@@ -104,9 +104,11 @@ class MetadataCache(brokerId: Int) extends Logging {
}
def getAliveEndpoint(brokerId: Int, listenerName: ListenerName): Option[Node] =
- aliveNodes.get(brokerId).map { nodeMap =>
- nodeMap.getOrElse(listenerName,
- throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
+ inReadLock(partitionMetadataLock) {
+ aliveNodes.get(brokerId).map { nodeMap =>
+ nodeMap.getOrElse(listenerName,
+ throw new BrokerEndPointNotAvailableException(s"Broker `$brokerId` does not have listener with name `$listenerName`"))
+ }
}
// errorUnavailableEndpoints exists to support v0 MetadataResponses
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 593c876..f4bfe39 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1304,7 +1304,7 @@ class ReplicaManager(val config: KafkaConfig,
// we do not need to check if the leader exists again since this has been done at the beginning of this process
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(
- metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
+ metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.brokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.highWatermark.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
diff --git a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
index 20f1db2..204f5c1 100755
--- a/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
+++ b/core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala
@@ -70,7 +70,7 @@ object UpdateOffsetsInZK extends Logging {
zkUtils.getBrokerInfo(broker) match {
case Some(brokerInfo) =>
- val brokerEndPoint = brokerInfo.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint = brokerInfo.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
val consumer = new SimpleConsumer(brokerEndPoint.host, brokerEndPoint.port, 10000, 100 * 1024, "UpdateOffsetsInZk")
val topicAndPartition = TopicAndPartition(topic, partition)
val request = OffsetRequest(Map(topicAndPartition -> PartitionOffsetRequestInfo(offsetOption, 1)))
diff --git a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
index b68379b..43d8ec8 100755
--- a/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AddPartitionsTest.scala
@@ -109,7 +109,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 2)
val listenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
- val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.getBrokerEndPoint(listenerName)),
+ val metadata = ClientUtils.fetchTopicMetadata(Set(topic1), brokers.map(_.brokerEndPoint(listenerName)),
"AddPartitionsTest-testIncrementPartitions", 2000, 0).topicsMetadata
val metaDataForTopic1 = metadata.filter(p => p.topic.equals(topic1))
val partitionDataForTopic1 = metaDataForTopic1.head.partitionsMetadata.sortBy(_.partitionId)
@@ -138,7 +138,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 1)
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
- brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+ brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testManualAssignmentOfReplicas", 2000, 0).topicsMetadata
val metaDataForTopic2 = metadata.filter(_.topic == topic2)
val partitionDataForTopic2 = metaDataForTopic2.head.partitionsMetadata.sortBy(_.partitionId)
@@ -164,7 +164,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic3, 6)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic3),
- brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+ brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testReplicaPlacementAllServers", 2000, 0).topicsMetadata
val metaDataForTopic3 = metadata.find(p => p.topic == topic3).get
@@ -187,7 +187,7 @@ class AddPartitionsTest extends ZooKeeperTestHarness {
TestUtils.waitUntilMetadataIsPropagated(servers, topic2, 2)
val metadata = ClientUtils.fetchTopicMetadata(Set(topic2),
- brokers.map(_.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
+ brokers.map(_.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))),
"AddPartitionsTest-testReplicaPlacementPartialServers", 2000, 0).topicsMetadata
val metaDataForTopic2 = metadata.find(p => p.topic == topic2).get
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 026786e..4bcf61d 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -158,7 +158,7 @@ object SerializationTestUtils {
def createConsumerMetadataResponse: GroupCoordinatorResponse = {
GroupCoordinatorResponse(Some(
- brokers.head.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0)
+ brokers.head.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))), Errors.NONE, 0)
}
}
diff --git a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
index c60a7ed..380b5ad 100644
--- a/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/BrokerEndPointTest.scala
@@ -59,7 +59,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+ val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("localhost", brokerEndPoint.host)
assertEquals(9093, brokerEndPoint.port)
}
@@ -76,7 +76,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("localhost", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
}
@@ -86,7 +86,7 @@ class BrokerEndPointTest {
val brokerInfoStr = """{"jmx_port":-1,"timestamp":"1420485325400","host":"172.16.8.243","version":1,"port":9091}"""
val broker = parseBrokerJson(1, brokerInfoStr)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+ val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
assertEquals("172.16.8.243", brokerEndPoint.host)
assertEquals(9091, brokerEndPoint.port)
}
@@ -104,7 +104,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
+ val brokerEndPoint = broker.brokerEndPoint(ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9093, brokerEndPoint.port)
assertEquals(Some("dc1"), broker.rack)
@@ -124,7 +124,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+ val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
assertEquals(None, broker.rack)
@@ -143,7 +143,7 @@ class BrokerEndPointTest {
}"""
val broker = parseBrokerJson(1, json)
assertEquals(1, broker.id)
- val brokerEndPoint = broker.getBrokerEndPoint(new ListenerName("CLIENT"))
+ val brokerEndPoint = broker.brokerEndPoint(new ListenerName("CLIENT"))
assertEquals("host1", brokerEndPoint.host)
assertEquals(9092, brokerEndPoint.port)
assertEquals(None, broker.rack)
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fd6073d..a85a10b 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -61,9 +61,9 @@ class KafkaApisTest {
private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
private val controller = EasyMock.createNiceMock(classOf[KafkaController])
private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
- private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
private val metrics = new Metrics()
private val brokerId = 1
+ private val metadataCache = new MetadataCache(brokerId)
private val authorizer: Option[Authorizer] = None
private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
@@ -363,6 +363,61 @@ class KafkaApisTest {
testConsumerListOffsetLatest(IsolationLevel.READ_COMMITTED)
}
+ /**
+ * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has
+ * more listeners than another) and the request is sent on the listener that exists in both brokers.
+ */
+ @Test
+ def testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers(): Unit = {
+ val (plaintextListener, _) = updateMetadataCacheWithInconsistentListeners()
+ val response = sendMetadataRequestWithInconsistentListeners(plaintextListener)
+ assertEquals(Set(0, 1), response.brokers.asScala.map(_.id).toSet)
+ }
+
+ /*
+ * Verifies that the metadata response is correct if the broker listeners are inconsistent (i.e. one broker has
+ * more listeners than another) and the request is sent on the listener that exists in one broker.
+ */
+ @Test
+ def testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers(): Unit = {
+ val (_, anotherListener) = updateMetadataCacheWithInconsistentListeners()
+ val response = sendMetadataRequestWithInconsistentListeners(anotherListener)
+ assertEquals(Set(0), response.brokers.asScala.map(_.id).toSet)
+ }
+
+ /**
+ * Return pair of listener names in the metadataCache: PLAINTEXT and LISTENER2 respectively.
+ */
+ private def updateMetadataCacheWithInconsistentListeners(): (ListenerName, ListenerName) = {
+ import UpdateMetadataRequest.{Broker => UBroker}
+ import UpdateMetadataRequest.{EndPoint => UEndPoint}
+ val plaintextListener = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ val anotherListener = new ListenerName("LISTENER2")
+ val brokers = Set(
+ new UBroker(0, Seq(new UEndPoint("broker0", 9092, SecurityProtocol.PLAINTEXT, plaintextListener),
+ new UEndPoint("broker0", 9093, SecurityProtocol.PLAINTEXT, anotherListener)).asJava, "rack"),
+ new UBroker(1, Seq(new UEndPoint("broker1", 9092, SecurityProtocol.PLAINTEXT, plaintextListener)).asJava,
+ "rack")
+ )
+ val updateMetadataRequest = new UpdateMetadataRequest.Builder(ApiKeys.UPDATE_METADATA.latestVersion, 0,
+ 0, Map.empty[TopicPartition, UpdateMetadataRequest.PartitionState].asJava, brokers.asJava).build()
+ metadataCache.updateCache(correlationId = 0, updateMetadataRequest)
+ (plaintextListener, anotherListener)
+ }
+
+ private def sendMetadataRequestWithInconsistentListeners(requestListener: ListenerName): MetadataResponse = {
+ val capturedResponse = EasyMock.newCapture[RequestChannel.Response]()
+ val capturedThrottleCallback = EasyMock.newCapture[Int => Unit]()
+ expectThrottleCallbackAndInvoke(capturedThrottleCallback)
+ EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+ EasyMock.replay(clientRequestQuotaManager, requestChannel)
+
+ val (metadataRequest, requestChannelRequest) = buildRequest(MetadataRequest.Builder.allTopics, requestListener)
+ createKafkaApis().handleTopicMetadataRequest(requestChannelRequest)
+
+ readResponse(ApiKeys.METADATA, metadataRequest, capturedResponse).asInstanceOf[MetadataResponse]
+ }
+
private def testConsumerListOffsetLatest(isolationLevel: IsolationLevel): Unit = {
val tp = new TopicPartition("foo", 0)
val latestOffset = 15L
@@ -400,14 +455,16 @@ class KafkaApisTest {
buildRequest(requestBuilder)
}
- private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T]): (T, RequestChannel.Request) = {
+ private def buildRequest[T <: AbstractRequest](builder: AbstractRequest.Builder[T],
+ listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): (T, RequestChannel.Request) = {
+
val request = builder.build()
val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
// read the header from the buffer first so that the body can be read next from the Request constructor
val header = RequestHeader.parse(buffer)
val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
- new ListenerName(""), SecurityProtocol.PLAINTEXT)
+ listenerName, SecurityProtocol.PLAINTEXT)
(request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0,
MemoryPool.NONE, buffer, requestChannelMetrics))
}
diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
index 9cbc260..6768b84 100755
--- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
@@ -134,7 +134,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness {
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", TestUtils.boundPort(s), listenerName,
securityProtocol))
- val nodes = brokers.map(_.getNode(listenerName))
+ val nodes = brokers.map(_.node(listenerName))
val controllerContext = new ControllerContext
controllerContext.liveBrokers = brokers.toSet
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 7781801..dc6ff9e 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -214,7 +214,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
}
private def sender(from: KafkaServer, to: KafkaServer): BlockingSend = {
- val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.getBrokerEndPoint(from.config.interBrokerListenerName)
+ val endPoint = from.metadataCache.getAliveBrokers.find(_.id == to.config.brokerId).get.brokerEndPoint(from.config.interBrokerListenerName)
new ReplicaFetcherBlockingSend(endPoint, from.config, new Metrics(), new SystemTime(), 42, "TestFetcher", new LogContext())
}
--
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <co...@kafka.apache.org>'].