You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/04 02:09:13 UTC

[GitHub] [kafka] mumrah opened a new pull request #10049: Refactor MetadataCache for Raft metadata

mumrah opened a new pull request #10049:
URL: https://github.com/apache/kafka/pull/10049


   This PR includes the changes to MetadataCache that were needed for handling Raft metadata records. It makes use of the classes merged in e119ea66cb1458637039c16169728c9e9989ec52


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10049:
URL: https://github.com/apache/kafka/pull/10049


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. This leads to `cluster.leaderFor` on L534 returning null
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571148172



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
       Should be easy to do with all the fancy junit 5 stuff. Since there are no separate tests for `RaftMetadataCache` (unless I'm missing some), I think we should do it here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r569897756



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -157,102 +178,86 @@ class MetadataCache(brokerId: Int) extends Logging {
    *
    * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
    */
-  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] = {
-    snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
   def getTopicMetadata(topics: Set[String],
                        listenerName: ListenerName,
                        errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
-    val snapshot = metadataSnapshot
+    val image = _currentImage
     topics.toSeq.flatMap { topic =>
-      getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
         new MetadataResponseTopic()
           .setErrorCode(Errors.NONE.code)
           .setName(topic)
-          .setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID))

Review comment:
       Let me know if I missed it somewhere, but we will need the ID included in the response.
   
   I think we might need a MetadataImage method like `topicIdToName` but instead `topicNameToId`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571139392



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
       nit: I don't think these factories are providing much over the constructors `MetdataCache.zkMetadataCache` vs `new ZkMetadataCache`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571208259



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
##########
@@ -37,7 +36,15 @@ object MetadataBroker {
         endPoint.name() ->
           new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
       }.toMap,
-      true)
+      fenced = true)
+  }
+
+  def apply(broker: Broker): MetadataBroker = {
+    new MetadataBroker(broker.id, broker.rack.orNull,
+      broker.endPoints.map { endpoint =>
+        endpoint.listenerName.value -> new Node(broker.id, endpoint.host, endpoint.port, broker.rack.orNull)
+      }.toMap,
+      fenced = false)

Review comment:
       as long as this is just used by the ZK code path there is no harm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571138962



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
       Yea, I think we can probably do some parameterization thing or possibly even use test templating similar to #9986




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571139784



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+    new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+    new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
       I left the ZK implementation in place since it's really the only production implementation for now. It also reduces the size of the diff for this change. I don't feel very strongly about it either way, so I'm happy to relocate it to a separate file




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571145320



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
       I was thinking it might be easier to refactor in the future if we only need to rename the factory method rather than changing all the `new Class`. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571142951



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+    new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+    new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
       Perhaps we can do this as a follow-up? It is nice at the moment to see the diffs clearly.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571162641



##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(image, brokerId, listenerName))
+          res.add(brokerId)
+      }
+      res
+    }
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterator[MetadataResponsePartition]] = {
+    val partitionsIterator = image.partitions.topicPartitions(topic)
+    if (!partitionsIterator.hasNext) {
+      None
+    } else {
+      Some(partitionsIterator.map { partition =>
+        val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+          listenerName, errorUnavailableEndpoints)
+        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
+          errorUnavailableEndpoints)
+        val maybeLeader = getAliveEndpoint(image, partition.leaderId, listenerName)
+        maybeLeader match {
+          case None =>
+            val error = if (image.aliveBroker(partition.leaderId).isEmpty) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: listener $listenerName " +
+                s"not found on leader ${partition.leaderId}")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(MetadataResponse.NO_LEADER_ID)
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+
+          case Some(leader) =>
+            val error = if (filteredReplicas.size < partition.replicas.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: replica information not available for " +
+                s"following brokers ${partition.replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else if (filteredIsr.size < partition.isr.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: in sync replica information not available for " +
+                s"following brokers ${partition.isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else {
+              Errors.NONE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(leader.id())
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+        }
+      })
+    }
+  }
+
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Boolean = {
+    image.brokers.aliveBroker(id).exists(_.endpoints.contains(listenerName.value()))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
+  }
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  override def getTopicMetadata(topics: Set[String],
+                                listenerName: ListenerName,
+                                errorUnavailableEndpoints: Boolean = false,
+                                errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
+    val image = _currentImage
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponseTopic()
+          .setErrorCode(Errors.NONE.code)
+          .setName(topic)
+          .setTopicId(image.topicNameToId(topic).getOrElse(Uuid.ZERO_UUID))
+          .setIsInternal(Topic.isInternal(topic))
+          .setPartitions(partitionMetadata.toBuffer.asJava)
+      }
+    }
+  }
+
+  override def getAllTopics(): Set[String] = _currentImage.partitions.allTopicNames()
+
+  override def getAllPartitions(): Set[TopicPartition] = {
+    _currentImage.partitions.allPartitions().map {
+      partition => partition.toTopicPartition
+    }.toSet
+  }
+
+  override def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    topics.diff(_currentImage.partitions.allTopicNames())
+  }
+
+  override def getAliveBroker(brokerId: Int): Option[MetadataBroker] = {
+    _currentImage.brokers.aliveBroker(brokerId)
+  }
+
+  override def getAliveBrokers: Seq[MetadataBroker] = {
+    _currentImage.brokers.aliveBrokers()
+  }
+
+  override def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
+    _currentImage.partitions.topicPartition(topic, partitionId).map { partition =>
+      new UpdateMetadataPartitionState().
+        setTopicName(partition.topicName).
+        setPartitionIndex(partition.partitionIndex).
+        setControllerEpoch(-1). // Controller epoch is not stored in the cache.
+        setLeader(partition.leaderId).
+        setLeaderEpoch(partition.leaderEpoch).
+        setIsr(partition.isr).
+        setZkVersion(-1) // ZK version is not stored in the cache.
+    }
+  }
+
+  override def numPartitions(topic: String): Option[Int] = {
+    _currentImage.partitions.numTopicPartitions(topic)
+  }
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  override def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(topic, partitionId).map { partition =>
+      image.aliveBroker(partition.leaderId) match {
+        case Some(broker) =>
+          broker.endpoints.getOrElse(listenerName.value(), Node.noNode)
+        case None =>
+          Node.noNode
+      }
+    }
+  }
+
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(tp.topic(), tp.partition()).map { partition =>
+      partition.replicas.asScala.map(replicaId => replicaId.intValue() -> {
+        image.aliveBroker(replicaId) match {
+          case Some(broker) =>
+            broker.endpoints.getOrElse(listenerName.value(), Node.noNode())
+          case None =>
+            Node.noNode()
+        }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
+  override def getControllerId: Option[Int] = {
+    _currentImage.controllerId
+  }
+
+  override def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    val image = _currentImage
+    val nodes = new util.HashMap[Integer, Node]
+    image.brokers.aliveBrokers().foreach { node => if (!node.fenced) {
+      node.endpoints.get(listenerName.value()).foreach { nodes.put(node.id, _) }
+    }
+    }
+
+    def node(id: Integer): Node = {
+      Option(nodes.get(id)).getOrElse(new Node(id, "", -1))
+    }
+
+    val partitionInfos = new util.ArrayList[PartitionInfo]
+    val internalTopics = new util.HashSet[String]
+
+    image.partitions.allPartitions().foreach { partition =>
+      partitionInfos.add(new PartitionInfo(partition.topicName,
+        partition.partitionIndex, node(partition.leaderId),
+        partition.replicas.asScala.map(node).toArray,
+        partition.isr.asScala.map(node).toArray,
+        partition.offlineReplicas.asScala.map(node).toArray))
+      if (Topic.isInternal(partition.topicName)) {
+        internalTopics.add(partition.topicName)
+      }
+    }
+
+    new Cluster(clusterId, nodes.values(),
+      partitionInfos, Collections.emptySet[String], internalTopics,
+      node(Integer.valueOf(image.controllerId.getOrElse(-1))))
+  }
+
+  def stateChangeTraceEnabled(): Boolean = {
+    stateChangeLogger.isTraceEnabled
+  }
+
+  def logStateChangeTrace(str: String): Unit = {
+    stateChangeLogger.trace(str)
+  }
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  override def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): Seq[TopicPartition] = {

Review comment:
       We shouldn't need this for the RaftMetadataCache. Did we end up rewriting the code to update from the image builder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163226



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
##########
@@ -37,7 +36,15 @@ object MetadataBroker {
         endPoint.name() ->
           new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
       }.toMap,
-      true)
+      fenced = true)
+  }
+
+  def apply(broker: Broker): MetadataBroker = {
+    new MetadataBroker(broker.id, broker.rack.orNull,
+      broker.endPoints.map { endpoint =>
+        endpoint.listenerName.value -> new Node(broker.id, endpoint.host, endpoint.port, broker.rack.orNull)
+      }.toMap,
+      fenced = false)

Review comment:
       I wanted to go ahead and conform to MetadataBroker for both implementations. One side-effect is we are exposing the fenced flag to ZK-based clusters. I've set it to false here since we don't have any notion of broker fencing in the ZK-based metadata. Is there any harm in this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571116863



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]

Review comment:
       can you put this in javadoc format ?  i.e. `@returns`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571161027



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
       Yea easy enough. I added `@ParameterizedTest` to MetadataCacheTest. One is failing with the Raft metadata cache, so I left that for ZK-only now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571119868



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
       we should have some way of running these tests on the raft metadata cache as well as the zk metadata cache.  I guess we can do that in a follow-up PR, though




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r569895315



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -106,13 +103,9 @@ class MetadataCacheTest {
         .setZkVersion(zkVersion)
         .setReplicas(asList(2, 1, 3)))
 
-    val topicIds = new util.HashMap[String, Uuid]()

Review comment:
       Are we just not testing topic Ids here anymore?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r569981529



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
       Why remove this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571253465



##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(image, brokerId, listenerName))
+          res.add(brokerId)
+      }
+      res
+    }
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterator[MetadataResponsePartition]] = {
+    val partitionsIterator = image.partitions.topicPartitions(topic)
+    if (!partitionsIterator.hasNext) {
+      None
+    } else {
+      Some(partitionsIterator.map { partition =>
+        val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+          listenerName, errorUnavailableEndpoints)
+        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
+          errorUnavailableEndpoints)
+        val maybeLeader = getAliveEndpoint(image, partition.leaderId, listenerName)
+        maybeLeader match {
+          case None =>
+            val error = if (image.aliveBroker(partition.leaderId).isEmpty) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: listener $listenerName " +
+                s"not found on leader ${partition.leaderId}")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(MetadataResponse.NO_LEADER_ID)
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+
+          case Some(leader) =>
+            val error = if (filteredReplicas.size < partition.replicas.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: replica information not available for " +
+                s"following brokers ${partition.replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else if (filteredIsr.size < partition.isr.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: in sync replica information not available for " +
+                s"following brokers ${partition.isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else {
+              Errors.NONE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(leader.id())
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+        }
+      })
+    }
+  }
+
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Boolean = {
+    image.brokers.aliveBroker(id).exists(_.endpoints.contains(listenerName.value()))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
+  }
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  override def getTopicMetadata(topics: Set[String],
+                                listenerName: ListenerName,
+                                errorUnavailableEndpoints: Boolean = false,
+                                errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
+    val image = _currentImage
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponseTopic()
+          .setErrorCode(Errors.NONE.code)
+          .setName(topic)
+          .setTopicId(image.topicNameToId(topic).getOrElse(Uuid.ZERO_UUID))
+          .setIsInternal(Topic.isInternal(topic))
+          .setPartitions(partitionMetadata.toBuffer.asJava)
+      }
+    }
+  }
+
+  override def getAllTopics(): Set[String] = _currentImage.partitions.allTopicNames()
+
+  override def getAllPartitions(): Set[TopicPartition] = {
+    _currentImage.partitions.allPartitions().map {
+      partition => partition.toTopicPartition
+    }.toSet
+  }
+
+  override def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    topics.diff(_currentImage.partitions.allTopicNames())
+  }
+
+  override def getAliveBroker(brokerId: Int): Option[MetadataBroker] = {
+    _currentImage.brokers.aliveBroker(brokerId)
+  }
+
+  override def getAliveBrokers: Seq[MetadataBroker] = {
+    _currentImage.brokers.aliveBrokers()
+  }
+
+  override def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
+    _currentImage.partitions.topicPartition(topic, partitionId).map { partition =>
+      new UpdateMetadataPartitionState().
+        setTopicName(partition.topicName).
+        setPartitionIndex(partition.partitionIndex).
+        setControllerEpoch(-1). // Controller epoch is not stored in the cache.
+        setLeader(partition.leaderId).
+        setLeaderEpoch(partition.leaderEpoch).
+        setIsr(partition.isr).
+        setZkVersion(-1) // ZK version is not stored in the cache.
+    }
+  }
+
+  override def numPartitions(topic: String): Option[Int] = {
+    _currentImage.partitions.numTopicPartitions(topic)
+  }
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  override def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(topic, partitionId).map { partition =>
+      image.aliveBroker(partition.leaderId) match {
+        case Some(broker) =>
+          broker.endpoints.getOrElse(listenerName.value(), Node.noNode)
+        case None =>
+          Node.noNode
+      }
+    }
+  }
+
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(tp.topic(), tp.partition()).map { partition =>
+      partition.replicas.asScala.map(replicaId => replicaId.intValue() -> {
+        image.aliveBroker(replicaId) match {
+          case Some(broker) =>
+            broker.endpoints.getOrElse(listenerName.value(), Node.noNode())
+          case None =>
+            Node.noNode()
+        }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
+  override def getControllerId: Option[Int] = {
+    _currentImage.controllerId
+  }
+
+  override def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    val image = _currentImage
+    val nodes = new util.HashMap[Integer, Node]
+    image.brokers.aliveBrokers().foreach { node =>
+      if (!node.fenced) {
+        node.endpoints.get(listenerName.value()).foreach { nodes.put(node.id, _) }
+      }
+    }
+
+    def node(id: Integer): Node = {
+      Option(nodes.get(id)).getOrElse(new Node(id, "", -1))
+    }
+
+    val partitionInfos = new util.ArrayList[PartitionInfo]
+    val internalTopics = new util.HashSet[String]
+
+    image.partitions.allPartitions().foreach { partition =>
+      partitionInfos.add(new PartitionInfo(partition.topicName,
+        partition.partitionIndex, node(partition.leaderId),
+        partition.replicas.asScala.map(node).toArray,
+        partition.isr.asScala.map(node).toArray,
+        partition.offlineReplicas.asScala.map(node).toArray))
+      if (Topic.isInternal(partition.topicName)) {
+        internalTopics.add(partition.topicName)
+      }
+    }
+
+    new Cluster(clusterId, nodes.values(),
+      partitionInfos, Collections.emptySet[String], internalTopics,
+      node(Integer.valueOf(image.controllerId.getOrElse(-1))))
+  }
+
+  def stateChangeTraceEnabled(): Boolean = {
+    stateChangeLogger.isTraceEnabled
+  }
+
+  def logStateChangeTrace(str: String): Unit = {
+    stateChangeLogger.trace(str)
+  }
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  override def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): Seq[TopicPartition] = {
+    inLock(lock) {
+      val image = _currentImage
+      val builder = MetadataImageBuilder(brokerId, logger.underlying, image)
+
+      builder.controllerId(if (request.controllerId() < 0) None else Some(request.controllerId()))
+
+      // Compare the new brokers with the existing ones.
+      def toMetadataBroker(broker: UpdateMetadataBroker): MetadataBroker = {
+        val endpoints = broker.endpoints().asScala.map { endpoint =>
+          endpoint.listener -> new Node(broker.id(), endpoint.host(), endpoint.port())

Review comment:
       Note that here we are, somewhat incorrectly, excluding the broker's rack from the Node object. This was done to keep the behavior compatible with the existing MetadataCache (now ZkMetadataCache). Once we have this merged and 2.8 is released, we can work on determining if this was done for a good reason, or if it's a bug in the existing MetadataCache logic. Since this class isn't used for any production code yet, we can leave this as-is.

##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(image, brokerId, listenerName))
+          res.add(brokerId)
+      }
+      res
+    }
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterator[MetadataResponsePartition]] = {
+    val partitionsIterator = image.partitions.topicPartitions(topic)
+    if (!partitionsIterator.hasNext) {
+      None
+    } else {
+      Some(partitionsIterator.map { partition =>
+        val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+          listenerName, errorUnavailableEndpoints)
+        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
+          errorUnavailableEndpoints)
+        val maybeLeader = getAliveEndpoint(image, partition.leaderId, listenerName)
+        maybeLeader match {
+          case None =>
+            val error = if (image.aliveBroker(partition.leaderId).isEmpty) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: listener $listenerName " +
+                s"not found on leader ${partition.leaderId}")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(MetadataResponse.NO_LEADER_ID)
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+
+          case Some(leader) =>
+            val error = if (filteredReplicas.size < partition.replicas.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: replica information not available for " +
+                s"following brokers ${partition.replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else if (filteredIsr.size < partition.isr.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: in sync replica information not available for " +
+                s"following brokers ${partition.isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else {
+              Errors.NONE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(leader.id())
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+        }
+      })
+    }
+  }
+
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Boolean = {
+    image.brokers.aliveBroker(id).exists(_.endpoints.contains(listenerName.value()))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
+  }
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  override def getTopicMetadata(topics: Set[String],
+                                listenerName: ListenerName,
+                                errorUnavailableEndpoints: Boolean = false,
+                                errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
+    val image = _currentImage
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponseTopic()
+          .setErrorCode(Errors.NONE.code)
+          .setName(topic)
+          .setTopicId(image.topicNameToId(topic).getOrElse(Uuid.ZERO_UUID))
+          .setIsInternal(Topic.isInternal(topic))
+          .setPartitions(partitionMetadata.toBuffer.asJava)
+      }
+    }
+  }
+
+  override def getAllTopics(): Set[String] = _currentImage.partitions.allTopicNames()
+
+  override def getAllPartitions(): Set[TopicPartition] = {
+    _currentImage.partitions.allPartitions().map {
+      partition => partition.toTopicPartition
+    }.toSet
+  }
+
+  override def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    topics.diff(_currentImage.partitions.allTopicNames())
+  }
+
+  override def getAliveBroker(brokerId: Int): Option[MetadataBroker] = {
+    _currentImage.brokers.aliveBroker(brokerId)
+  }
+
+  override def getAliveBrokers: Seq[MetadataBroker] = {
+    _currentImage.brokers.aliveBrokers()
+  }
+
+  override def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
+    _currentImage.partitions.topicPartition(topic, partitionId).map { partition =>
+      new UpdateMetadataPartitionState().
+        setTopicName(partition.topicName).
+        setPartitionIndex(partition.partitionIndex).
+        setControllerEpoch(-1). // Controller epoch is not stored in the cache.
+        setLeader(partition.leaderId).
+        setLeaderEpoch(partition.leaderEpoch).
+        setIsr(partition.isr).
+        setZkVersion(-1) // ZK version is not stored in the cache.
+    }
+  }
+
+  override def numPartitions(topic: String): Option[Int] = {
+    _currentImage.partitions.numTopicPartitions(topic)
+  }
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  override def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(topic, partitionId).map { partition =>
+      image.aliveBroker(partition.leaderId) match {
+        case Some(broker) =>
+          broker.endpoints.getOrElse(listenerName.value(), Node.noNode)
+        case None =>
+          Node.noNode
+      }
+    }
+  }
+
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(tp.topic(), tp.partition()).map { partition =>
+      partition.replicas.asScala.map(replicaId => replicaId.intValue() -> {
+        image.aliveBroker(replicaId) match {
+          case Some(broker) =>
+            broker.endpoints.getOrElse(listenerName.value(), Node.noNode())
+          case None =>
+            Node.noNode()
+        }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
+  override def getControllerId: Option[Int] = {
+    _currentImage.controllerId
+  }
+
+  override def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    val image = _currentImage
+    val nodes = new util.HashMap[Integer, Node]
+    image.brokers.aliveBrokers().foreach { node =>
+      if (!node.fenced) {
+        node.endpoints.get(listenerName.value()).foreach { nodes.put(node.id, _) }
+      }
+    }
+
+    def node(id: Integer): Node = {
+      Option(nodes.get(id)).getOrElse(new Node(id, "", -1))
+    }
+
+    val partitionInfos = new util.ArrayList[PartitionInfo]
+    val internalTopics = new util.HashSet[String]
+
+    image.partitions.allPartitions().foreach { partition =>
+      partitionInfos.add(new PartitionInfo(partition.topicName,
+        partition.partitionIndex, node(partition.leaderId),
+        partition.replicas.asScala.map(node).toArray,
+        partition.isr.asScala.map(node).toArray,
+        partition.offlineReplicas.asScala.map(node).toArray))
+      if (Topic.isInternal(partition.topicName)) {
+        internalTopics.add(partition.topicName)
+      }
+    }
+
+    new Cluster(clusterId, nodes.values(),
+      partitionInfos, Collections.emptySet[String], internalTopics,
+      node(Integer.valueOf(image.controllerId.getOrElse(-1))))
+  }
+
+  def stateChangeTraceEnabled(): Boolean = {
+    stateChangeLogger.isTraceEnabled
+  }
+
+  def logStateChangeTrace(str: String): Unit = {
+    stateChangeLogger.trace(str)
+  }
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  override def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): Seq[TopicPartition] = {
+    inLock(lock) {
+      val image = _currentImage
+      val builder = MetadataImageBuilder(brokerId, logger.underlying, image)
+
+      builder.controllerId(if (request.controllerId() < 0) None else Some(request.controllerId()))
+
+      // Compare the new brokers with the existing ones.
+      def toMetadataBroker(broker: UpdateMetadataBroker): MetadataBroker = {
+        val endpoints = broker.endpoints().asScala.map { endpoint =>
+          endpoint.listener -> new Node(broker.id(), endpoint.host(), endpoint.port())

Review comment:
       Note that here we are, somewhat incorrectly, excluding the broker's rack from the Node object. This was done to keep the behavior compatible with the existing MetadataCache (now ZkMetadataCache). Once we have this merged and 2.8 is released, we can work on determining if this was done for a good reason, or if it's a bug in the existing MetadataCache logic. Since this class isn't used for any production code yet, we can leave this as-is for now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571118214



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+    new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+    new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
       This is kind of a lot of code so why not have it in its own file?
   
   The JavaDoc should also explain that this is for brokers using ZK and not for brokers using the metadata quorum.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
       Hmm, I think this was an artifact of the merge, I'll restore this test

##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions

##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. This leads to `cluster.leaderFor` on L534 returning null
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571151499



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataImage.scala
##########
@@ -118,5 +118,10 @@ case class MetadataImage(partitions: MetadataPartitions,
   def topicIdToName(uuid: Uuid): Option[String] = {
     partitions.topicIdToName(uuid)
   }
+
+  def topicNameToId(name: String): Option[Uuid] = {
+

Review comment:
       nit: remove newline

##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "

Review comment:
       nit: unneeded braces

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]

Review comment:
       nit: We can save it for a follow-up, but it would be nice to drop all the `get` prefixes here

##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(image, brokerId, listenerName))
+          res.add(brokerId)
+      }
+      res
+    }
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterator[MetadataResponsePartition]] = {
+    val partitionsIterator = image.partitions.topicPartitions(topic)
+    if (!partitionsIterator.hasNext) {
+      None
+    } else {
+      Some(partitionsIterator.map { partition =>
+        val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+          listenerName, errorUnavailableEndpoints)
+        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
+          errorUnavailableEndpoints)
+        val maybeLeader = getAliveEndpoint(image, partition.leaderId, listenerName)
+        maybeLeader match {
+          case None =>
+            val error = if (image.aliveBroker(partition.leaderId).isEmpty) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: listener $listenerName " +
+                s"not found on leader ${partition.leaderId}")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(MetadataResponse.NO_LEADER_ID)
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+
+          case Some(leader) =>
+            val error = if (filteredReplicas.size < partition.replicas.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: replica information not available for " +
+                s"following brokers ${partition.replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else if (filteredIsr.size < partition.isr.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: in sync replica information not available for " +
+                s"following brokers ${partition.isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else {
+              Errors.NONE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(leader.id())
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+        }
+      })
+    }
+  }
+
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Boolean = {
+    image.brokers.aliveBroker(id).exists(_.endpoints.contains(listenerName.value()))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
+  }
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  override def getTopicMetadata(topics: Set[String],
+                                listenerName: ListenerName,
+                                errorUnavailableEndpoints: Boolean = false,
+                                errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
+    val image = _currentImage
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponseTopic()
+          .setErrorCode(Errors.NONE.code)
+          .setName(topic)
+          .setTopicId(image.topicNameToId(topic).getOrElse(Uuid.ZERO_UUID))
+          .setIsInternal(Topic.isInternal(topic))
+          .setPartitions(partitionMetadata.toBuffer.asJava)
+      }
+    }
+  }
+
+  override def getAllTopics(): Set[String] = _currentImage.partitions.allTopicNames()
+
+  override def getAllPartitions(): Set[TopicPartition] = {
+    _currentImage.partitions.allPartitions().map {
+      partition => partition.toTopicPartition
+    }.toSet
+  }
+
+  override def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    topics.diff(_currentImage.partitions.allTopicNames())
+  }
+
+  override def getAliveBroker(brokerId: Int): Option[MetadataBroker] = {
+    _currentImage.brokers.aliveBroker(brokerId)
+  }
+
+  override def getAliveBrokers: Seq[MetadataBroker] = {
+    _currentImage.brokers.aliveBrokers()
+  }
+
+  override def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
+    _currentImage.partitions.topicPartition(topic, partitionId).map { partition =>
+      new UpdateMetadataPartitionState().
+        setTopicName(partition.topicName).
+        setPartitionIndex(partition.partitionIndex).
+        setControllerEpoch(-1). // Controller epoch is not stored in the cache.
+        setLeader(partition.leaderId).
+        setLeaderEpoch(partition.leaderEpoch).
+        setIsr(partition.isr).
+        setZkVersion(-1) // ZK version is not stored in the cache.
+    }
+  }
+
+  override def numPartitions(topic: String): Option[Int] = {
+    _currentImage.partitions.numTopicPartitions(topic)
+  }
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  override def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(topic, partitionId).map { partition =>
+      image.aliveBroker(partition.leaderId) match {
+        case Some(broker) =>
+          broker.endpoints.getOrElse(listenerName.value(), Node.noNode)
+        case None =>
+          Node.noNode
+      }
+    }
+  }
+
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(tp.topic(), tp.partition()).map { partition =>
+      partition.replicas.asScala.map(replicaId => replicaId.intValue() -> {
+        image.aliveBroker(replicaId) match {
+          case Some(broker) =>
+            broker.endpoints.getOrElse(listenerName.value(), Node.noNode())
+          case None =>
+            Node.noNode()
+        }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
+  override def getControllerId: Option[Int] = {
+    _currentImage.controllerId
+  }
+
+  override def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    val image = _currentImage
+    val nodes = new util.HashMap[Integer, Node]
+    image.brokers.aliveBrokers().foreach { node => if (!node.fenced) {
+      node.endpoints.get(listenerName.value()).foreach { nodes.put(node.id, _) }
+    }
+    }
+
+    def node(id: Integer): Node = {
+      Option(nodes.get(id)).getOrElse(new Node(id, "", -1))
+    }
+
+    val partitionInfos = new util.ArrayList[PartitionInfo]
+    val internalTopics = new util.HashSet[String]
+
+    image.partitions.allPartitions().foreach { partition =>
+      partitionInfos.add(new PartitionInfo(partition.topicName,
+        partition.partitionIndex, node(partition.leaderId),
+        partition.replicas.asScala.map(node).toArray,
+        partition.isr.asScala.map(node).toArray,
+        partition.offlineReplicas.asScala.map(node).toArray))
+      if (Topic.isInternal(partition.topicName)) {
+        internalTopics.add(partition.topicName)
+      }
+    }
+
+    new Cluster(clusterId, nodes.values(),
+      partitionInfos, Collections.emptySet[String], internalTopics,
+      node(Integer.valueOf(image.controllerId.getOrElse(-1))))
+  }
+
+  def stateChangeTraceEnabled(): Boolean = {
+    stateChangeLogger.isTraceEnabled
+  }
+
+  def logStateChangeTrace(str: String): Unit = {
+    stateChangeLogger.trace(str)
+  }
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  override def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): Seq[TopicPartition] = {

Review comment:
       We shouldn't need this for the RaftMetadataCache. Did we end up rewriting the log to update from the image builder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571140218



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]

Review comment:
       Yea, this just got pulled up from the class when I extracted the trait. I'll fix up these comments




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571116386



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(

Review comment:
       The formatting is kind of weird here.  Why the blank line after `getTopicMetadata(`?
   
   Also, it would be good to have Javadoc for this function




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r569981529



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
       Why remove this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571169638



##########
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##########
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
+import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                          topic: String, partitionId: Int): Boolean = {
+    partitionStates.get(topic).exists { infos =>
+      infos.remove(partitionId)
+      if (infos.isEmpty) partitionStates.remove(topic)
+      true
+    }
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                               topic: String,
+                               partitionId: Int,
+                               stateInfo: UpdateMetadataPartitionState): Unit = {
+    val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+    infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=${brokerId}] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, `brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+                                       brokers: java.util.List[Integer],
+                                       listenerName: ListenerName,
+                                       filterUnavailableEndpoints: Boolean): java.util.List[Integer] = {
+    if (!filterUnavailableEndpoints) {
+      brokers
+    } else {
+      val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size))
+      for (brokerId <- brokers.asScala) {
+        if (hasAliveEndpoint(image, brokerId, listenerName))
+          res.add(brokerId)
+      }
+      res
+    }
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: String, listenerName: ListenerName, errorUnavailableEndpoints: Boolean,
+                                   errorUnavailableListeners: Boolean): Option[Iterator[MetadataResponsePartition]] = {
+    val partitionsIterator = image.partitions.topicPartitions(topic)
+    if (!partitionsIterator.hasNext) {
+      None
+    } else {
+      Some(partitionsIterator.map { partition =>
+        val filteredReplicas = maybeFilterAliveReplicas(image, partition.replicas,
+          listenerName, errorUnavailableEndpoints)
+        val filteredIsr = maybeFilterAliveReplicas(image, partition.isr, listenerName,
+          errorUnavailableEndpoints)
+        val maybeLeader = getAliveEndpoint(image, partition.leaderId, listenerName)
+        maybeLeader match {
+          case None =>
+            val error = if (image.aliveBroker(partition.leaderId).isEmpty) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: leader not available")
+              Errors.LEADER_NOT_AVAILABLE
+            } else {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: listener $listenerName " +
+                s"not found on leader ${partition.leaderId}")
+              if (errorUnavailableListeners) Errors.LISTENER_NOT_FOUND else Errors.LEADER_NOT_AVAILABLE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(MetadataResponse.NO_LEADER_ID)
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+
+          case Some(leader) =>
+            val error = if (filteredReplicas.size < partition.replicas.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: replica information not available for " +
+                s"following brokers ${partition.replicas.asScala.filterNot(filteredReplicas.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else if (filteredIsr.size < partition.isr.size) {
+              debug(s"Error while fetching metadata for ${partition.toTopicPartition}: in sync replica information not available for " +
+                s"following brokers ${partition.isr.asScala.filterNot(filteredIsr.contains).mkString(",")}")
+              Errors.REPLICA_NOT_AVAILABLE
+            } else {
+              Errors.NONE
+            }
+
+            new MetadataResponsePartition()
+              .setErrorCode(error.code)
+              .setPartitionIndex(partition.partitionIndex)
+              .setLeaderId(leader.id())
+              .setLeaderEpoch(partition.leaderEpoch)
+              .setReplicaNodes(filteredReplicas)
+              .setIsrNodes(filteredIsr)
+              .setOfflineReplicas(partition.offlineReplicas)
+        }
+      })
+    }
+  }
+
+  /**
+   * Check whether a broker is alive and has a registered listener matching the provided name.
+   * This method was added to avoid unnecessary allocations in [[maybeFilterAliveReplicas]], which is
+   * a hotspot in metadata handling.
+   */
+  private def hasAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Boolean = {
+    image.brokers.aliveBroker(id).exists(_.endpoints.contains(listenerName.value()))
+  }
+
+  /**
+   * Get the endpoint matching the provided listener if the broker is alive. Note that listeners can
+   * be added dynamically, so a broker with a missing listener could be a transient error.
+   *
+   * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
+   */
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
+  }
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  override def getTopicMetadata(topics: Set[String],
+                                listenerName: ListenerName,
+                                errorUnavailableEndpoints: Boolean = false,
+                                errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
+    val image = _currentImage
+    topics.toSeq.flatMap { topic =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+        new MetadataResponseTopic()
+          .setErrorCode(Errors.NONE.code)
+          .setName(topic)
+          .setTopicId(image.topicNameToId(topic).getOrElse(Uuid.ZERO_UUID))
+          .setIsInternal(Topic.isInternal(topic))
+          .setPartitions(partitionMetadata.toBuffer.asJava)
+      }
+    }
+  }
+
+  override def getAllTopics(): Set[String] = _currentImage.partitions.allTopicNames()
+
+  override def getAllPartitions(): Set[TopicPartition] = {
+    _currentImage.partitions.allPartitions().map {
+      partition => partition.toTopicPartition
+    }.toSet
+  }
+
+  override def getNonExistingTopics(topics: Set[String]): Set[String] = {
+    topics.diff(_currentImage.partitions.allTopicNames())
+  }
+
+  override def getAliveBroker(brokerId: Int): Option[MetadataBroker] = {
+    _currentImage.brokers.aliveBroker(brokerId)
+  }
+
+  override def getAliveBrokers: Seq[MetadataBroker] = {
+    _currentImage.brokers.aliveBrokers()
+  }
+
+  override def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataPartitionState] = {
+    _currentImage.partitions.topicPartition(topic, partitionId).map { partition =>
+      new UpdateMetadataPartitionState().
+        setTopicName(partition.topicName).
+        setPartitionIndex(partition.partitionIndex).
+        setControllerEpoch(-1). // Controller epoch is not stored in the cache.
+        setLeader(partition.leaderId).
+        setLeaderEpoch(partition.leaderEpoch).
+        setIsr(partition.isr).
+        setZkVersion(-1) // ZK version is not stored in the cache.
+    }
+  }
+
+  override def numPartitions(topic: String): Option[Int] = {
+    _currentImage.partitions.numTopicPartitions(topic)
+  }
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  override def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(topic, partitionId).map { partition =>
+      image.aliveBroker(partition.leaderId) match {
+        case Some(broker) =>
+          broker.endpoints.getOrElse(listenerName.value(), Node.noNode)
+        case None =>
+          Node.noNode
+      }
+    }
+  }
+
+  override def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] = {
+    val image = _currentImage
+    image.partitions.topicPartition(tp.topic(), tp.partition()).map { partition =>
+      partition.replicas.asScala.map(replicaId => replicaId.intValue() -> {
+        image.aliveBroker(replicaId) match {
+          case Some(broker) =>
+            broker.endpoints.getOrElse(listenerName.value(), Node.noNode())
+          case None =>
+            Node.noNode()
+        }}).toMap
+        .filter(pair => pair match {
+          case (_, node) => !node.isEmpty
+        })
+    }.getOrElse(Map.empty[Int, Node])
+  }
+
+  override def getControllerId: Option[Int] = {
+    _currentImage.controllerId
+  }
+
+  override def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster = {
+    val image = _currentImage
+    val nodes = new util.HashMap[Integer, Node]
+    image.brokers.aliveBrokers().foreach { node => if (!node.fenced) {
+      node.endpoints.get(listenerName.value()).foreach { nodes.put(node.id, _) }
+    }
+    }
+
+    def node(id: Integer): Node = {
+      Option(nodes.get(id)).getOrElse(new Node(id, "", -1))
+    }
+
+    val partitionInfos = new util.ArrayList[PartitionInfo]
+    val internalTopics = new util.HashSet[String]
+
+    image.partitions.allPartitions().foreach { partition =>
+      partitionInfos.add(new PartitionInfo(partition.topicName,
+        partition.partitionIndex, node(partition.leaderId),
+        partition.replicas.asScala.map(node).toArray,
+        partition.isr.asScala.map(node).toArray,
+        partition.offlineReplicas.asScala.map(node).toArray))
+      if (Topic.isInternal(partition.topicName)) {
+        internalTopics.add(partition.topicName)
+      }
+    }
+
+    new Cluster(clusterId, nodes.values(),
+      partitionInfos, Collections.emptySet[String], internalTopics,
+      node(Integer.valueOf(image.controllerId.getOrElse(-1))))
+  }
+
+  def stateChangeTraceEnabled(): Boolean = {
+    stateChangeLogger.isTraceEnabled
+  }
+
+  def logStateChangeTrace(str: String): Unit = {
+    stateChangeLogger.trace(str)
+  }
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  override def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): Seq[TopicPartition] = {

Review comment:
       Discussed offline. This is useful temporarily to allow reuse of `MetadataCacheTest`. Once 2.8 is cut, we'll hopefully be able to consolidate this logic to use the MetadataImage for both zk and raft.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163776



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -485,7 +494,7 @@ class MetadataCacheTest {
 
   @Test

Review comment:
       This test is failing in Raft mode. I'll investigate




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
       Hmm, I think this was an artifact of the merge, I'll restore this test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r569896329



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -157,102 +178,86 @@ class MetadataCache(brokerId: Int) extends Logging {
    *
    * @return None if broker is not alive or if the broker does not have a listener named `listenerName`.
    */
-  private def getAliveEndpoint(snapshot: MetadataSnapshot, brokerId: Int, listenerName: ListenerName): Option[Node] = {
-    snapshot.aliveNodes.get(brokerId).flatMap(_.get(listenerName))
+  private def getAliveEndpoint(image: MetadataImage, id: Int, listenerName: ListenerName): Option[Node] = {
+    image.brokers.aliveBroker(id).flatMap(_.endpoints.get(listenerName.value()))
   }
 
   // errorUnavailableEndpoints exists to support v0 MetadataResponses
   def getTopicMetadata(topics: Set[String],
                        listenerName: ListenerName,
                        errorUnavailableEndpoints: Boolean = false,
                        errorUnavailableListeners: Boolean = false): Seq[MetadataResponseTopic] = {
-    val snapshot = metadataSnapshot
+    val image = _currentImage
     topics.toSeq.flatMap { topic =>
-      getPartitionMetadata(snapshot, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
+      getPartitionMetadata(image, topic, listenerName, errorUnavailableEndpoints, errorUnavailableListeners).map { partitionMetadata =>
         new MetadataResponseTopic()
           .setErrorCode(Errors.NONE.code)
           .setName(topic)
-          .setTopicId(snapshot.topicIds.getOrElse(topic, Uuid.ZERO_UUID))

Review comment:
       Is there a place where we are setting the topic ID in the metadata response?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571149211



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
       Ok. Probably not a ton of work either way, but will leave it up to you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173506



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]

Review comment:
       KAFKA-12299




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid}
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+                        topics: collection.Set[String],
+                        listenerName: ListenerName,
+                        errorUnavailableEndpoints: Boolean = false,
+                        errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return Some(node)
+  // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster
+
+  // This method returns the deleted TopicPartitions received from UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+    new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+    new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
       KAFKA-12299




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org