You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/22 21:14:48 UTC

[GitHub] [kafka] jsancio commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface

jsancio commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r656432494



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -359,10 +359,14 @@ class BrokerServer(
       // Start other services that we've delayed starting, in the appropriate order.
       replicaManager.startup()
       replicaManager.startHighWatermarkCheckPointThread()
-      groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
-        getOrElse(config.offsetsTopicPartitions))
-      transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
-        getOrElse(config.transactionTopicPartitions))
+      groupCoordinator.startup(() => {
+        val curPartitions = metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
+        if (curPartitions > 0) curPartitions else  config.offsetsTopicPartitions
+      })
+      transactionCoordinator.startup(() => {
+        val curPartitions = metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME)
+        if (curPartitions > 0) curPartitions else  config.transactionTopicPartitions
+      })

Review comment:
       For these two `startup` calls you can use this syntax:
   ```
   startup { () =>
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     var unauthorizedForCreateTopics = Set[String]()
 
     if (authorizedTopics.nonEmpty) {
-      val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics)
+      val nonExistingTopics = authorizedTopics.filter(!metadataCache.contains(_))

Review comment:
       ```suggestion
         val nonExistingTopics = authorizedTopics.filterNot(metadataCache.contains)
   ```

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -151,17 +154,32 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  private def mockGetAliveBrokerFunctions(cache: MetadataCache, aliveBrokers: Seq[Node]): Unit = {
+    Mockito.when(cache.hasAliveBroker(ArgumentMatchers.anyInt())).thenAnswer(new Answer[Boolean]() {
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        aliveBrokers.map(_.id()).contains(invocation.getArguments()(0).asInstanceOf[Int])
+      }
+    })
+    Mockito.when(cache.getAliveBrokerNode(ArgumentMatchers.anyInt(), ArgumentMatchers.any[String])).
+      thenAnswer(new Answer[Option[Node]]() {
+        override def answer(invocation: InvocationOnMock): Option[Node] = {
+          aliveBrokers.find(node => node.id == invocation.getArguments()(0).asInstanceOf[Integer])
+        }
+      })
+    Mockito.when(cache.getAliveBrokerNodes(ArgumentMatchers.any[String])).thenReturn(aliveBrokers)
+  }
+
   @Test
   def testClearPurgatoryOnBecomingFollower(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+
+  val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)

Review comment:
       The indentation seems off.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1352,8 +1370,7 @@ class ReplicaManagerTest {
       Optional.of(1))
     val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10)
     assertNull(fetchResult.get)
-
-    Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
+    Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true)

Review comment:
       ```suggestion
       Mockito.when(metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true)
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseCallback(error)(partitionErrors)
     } else {
       val partitions = if (electionRequest.data.topicPartitions == null) {
-        metadataCache.getAllPartitions()
+        metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_))

Review comment:
       ```suggestion
           metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions)
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
       The nice thing about using `Option[Int]` is that it forces the caller to handle the `None` or `0` case. For example, in some cases the caller converts the `None` to `0`. In other cases it converts the `None` case to some configuration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org