You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/19 11:59:30 UTC

[GitHub] [kafka] dengziming opened a new pull request #9622: KAFKA-10547; add topicId in MetadataResp

dengziming opened a new pull request #9622:
URL: https://github.com/apache/kafka/pull/9622


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   1. Bump the version of MetadataReq and MetadataResp, add topicId in MetadataResp
   2. Alter describeTopic in AdminClientTopicService and ZookeeperTopicService
   3. TopicMetadata is cached in MetadataCache, so we need to add topicId to MetadataCache
   4. MetadataCache is updated by UpdateMetadataRequest, bump the version of UpdateMetadataReq and UpdateMetadataResp, add topicId in UpdateMetadataReq.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   Tested locally, here is some result:
   
   New server + new Client : 
   
   kafka-topics.sh --describe --zookeeper localhost:2181 --topic old-version-topic 
   Topic: old-version-topic	TopicId: wRPl6VAlQeyE77bDxEESzg	PartitionCount: 2	ReplicationFactor: 1	Configs: 
   	Topic: old-version-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
   	Topic: old-version-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic 
   Topic: old-version-topic	TopicId: wRPl6VAlQeyE77bDxEESzg	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
   	Topic: old-version-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
   	Topic: old-version-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
   
   Old Server + new Client
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic 
   Topic: old-version-topic	TopicId: 	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
   	Topic: old-version-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
   	Topic: old-version-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
   
   New server + old client
   
   kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic old-version-topic 
   Topic: old-version-topic	PartitionCount: 2	ReplicationFactor: 1	Configs: segment.bytes=1073741824
   	Topic: old-version-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
   	Topic: old-version-topic	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       So it seems like there is some logic to remove the partition states of deleted topics from the MetadataSnapshot. Would we want to do something similar there? Apologies if I'm missing something




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528707065



##########
File path: clients/src/main/resources/common/message/MetadataRequest.json
##########
@@ -31,9 +31,11 @@
     // Starting in version 8, authorized operations can be requested for cluster and topic resource.
     //
     // Version 9 is the first flexible version.
+    // Version 10 add topicId
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "ignorable": true,

Review comment:
       I tried to use nullable but got an Exceptioin with `Unrecognized field "nullable"` message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544515142



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       Yes, that was my suggestion too, but my wording wasn't right. I will reword that to avoid confusion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544479887



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       So it seems like there is some logic to remove the partition states of deleted topics from the MetadataSnapshot. Would we want to do something similar there but with topic Ids? Apologies if I'm missing something




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       Thank you for your suggestions, I now understand the solution.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544150030



##########
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##########
@@ -223,6 +224,31 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size())
   }
 
+  @Test
+  def testTopicIdsInResponse(): Unit = {
+    val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+    createTopic(topic1, replicaAssignment)
+    createTopic(topic2, replicaAssignment)
+
+    // if version < 9, return ZERO_UUID in MetadataResponse
+    val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
+    assertEquals(2, resp1.topicMetadata.size)
+    resp1.topicMetadata.forEach { topicMetadata =>
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
+    }
+
+    // from version 10, UUID will be included in MetadataResponse
+    val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
+    assertEquals(2, resp2.topicMetadata.size)
+    resp2.topicMetadata.forEach { topicMetadata =>
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId())

Review comment:
       we probably also want to assert that the topic id is not null here (even though we currently never return null).

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       When a topic is deleted, brokers process UpdateMetadataRequest and remove deleted topics from their cache. We track deletion state in ZooKeeper and as you mentioned, you can get this information by directly going to ZK in kafka-topics.sh. But we don't retain that information in every broker. I would remove topic id in the code segment just below this when the topic is removed from the MetadataCache since we cannot clearly have a map that keeps growing in brokers. Is there a reason why we would want to retain topic id in every broker even after the topic has been deleted? We can't get this information through existing metadata request from brokers anyway. I guess in future, we can add additional metadata to track deleted topic ids if we wanted to, but for now it seems better to delete topic ids from MetadataCache when we delete the topic from the cache.  What do you think?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##########
@@ -320,6 +333,7 @@ public String toString() {
             return "TopicMetadata{" +
                 "error=" + error +
                 ", topic='" + topic + '\'' +
+                ", topicId='" + topicId.toString() + '\'' +

Review comment:
       nit: toString() unnecessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-748328137


   @dengziming Thanks for the PR, merging to trunk.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r543288152



##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -873,6 +883,11 @@ class ControllerChannelManagerTest {
       context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
       leaderIndex += 1
     }
+
+    context.allTopics ++= topics
+    for (topic <- topics if topicIds.contains(topic)) {

Review comment:
       done!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #9622:
URL: https://github.com/apache/kafka/pull/9622


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528401033



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +350,18 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {
+        return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+    }
+
+    public String getTopicName(Uuid topiId) {

Review comment:
       Also, for getTopicId, we do getOrDefault. Is there a reason we wouldn't do that here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528366073



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +350,18 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {
+        return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+    }
+
+    public String getTopicName(Uuid topiId) {

Review comment:
       nit: `Uuid topicId`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528707065



##########
File path: clients/src/main/resources/common/message/MetadataRequest.json
##########
@@ -31,9 +31,11 @@
     // Starting in version 8, authorized operations can be requested for cluster and topic resource.
     //
     // Version 9 is the first flexible version.
+    // Version 10 add topicId
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "ignorable": true,

Review comment:
       I tried to use nullable but got an Exceptioin with `Unrecognized field "nullable"` message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, as ordered in the [JIRA ticket.](https://issues.apache.org/jira/browse/KAFKA-10545) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same and merge mine first and yours immediately after. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528401568



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -113,6 +114,7 @@ object TopicCommand extends Logging {
     def printDescription(): Unit = {
       val configsAsString = config.entries.asScala.filter(!_.isDefault).map { ce => s"${ce.name}=${ce.value}" }.mkString(",")
       print(s"Topic: $topic")
+      print(s"\tTopicId: ${if(topicId == Uuid.ZERO_UUID) "" else topicId}")

Review comment:
       Do we want to print an empty string for TopicId? Would it make sense to include the Zero Uuid or not print a topic ID label at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528404378



##########
File path: core/src/main/scala/kafka/api/ApiVersion.scala
##########
@@ -424,6 +426,13 @@ case object KAFKA_2_7_IV2 extends DefaultApiVersion {
   val id: Int = 30
 }
 
+case object KAFKA_2_7_IV3 extends DefaultApiVersion {

Review comment:
       nit: this should be KAFKA_2_8_IV0, or KAFKA_2_8_IV1 if https://github.com/apache/kafka/pull/9601 merges




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-731847801


   @dengziming Looks pretty good so far to me! I think it would be useful to write a few unit/integration tests to ensure the metadata snapshot behavior and describe topics work as expected.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528699411



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +350,18 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {
+        return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+    }
+
+    public String getTopicName(Uuid topiId) {

Review comment:
       Here, I try to use getOrDefault but don't know what the default value is(it seems that "" is not a good default topicName). so just return null and let the caller decide, suggestions are welcomed if you have good ideas.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-748239592


   There is a compilation failure in the Java 8 build because the PR uses an API not available in Scala 2.12. Will push a fix.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, following the ordering of the  [JIRA tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same time and merge mine first and yours immediately after. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544180651



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       When a topic is deleted, brokers process UpdateMetadataRequest and remove deleted topics from their cache. We track deletion state in ZooKeeper and as you mentioned, you can get this information by directly going to ZK in kafka-topics.sh. But we don't retain that information in every broker. I would remove topic id in the code segment just below this when the partition state is removed from the MetadataCache since we cannot clearly have a map that keeps growing in brokers. Is there a reason why we would want to retain topic id in every broker even after the topic has been deleted? We can't get this information through existing metadata request from brokers anyway. I guess in future, we can add additional metadata to track deleted topic ids if we wanted to, but for now it seems better to delete topic ids from MetadataCache when we delete the partition from the cache.  What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730332596


   @rajinisivaram @jolshan Hi, PTAL.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r543289892



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -46,6 +46,8 @@
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
     private final Map<Integer, Node> nodesById;
     private final ClusterResource clusterResource;
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;

Review comment:
       Thank you for your suggestions, the `topicNames` is not used so I just removed it. Maybe we will add them if it's necessary in a future use case.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r550906054



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +346,14 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {

Review comment:
       Thanks. looks good!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-730710397


   Hi @dengziming! Thanks for the PR! I was hoping to add LeaderAndIsrRequests before UpdateMetadata/Metadata, following the ordering of the  [JIRA tickets.](https://issues.apache.org/jira/browse/KAFKA-8872) There are just a few features for persisting the topic IDs I wanted to include. I'm thinking we could review this PR and my PR: https://github.com/apache/kafka/pull/9626 at the same and merge mine first and yours immediately after. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528407594



##########
File path: clients/src/main/resources/common/message/MetadataRequest.json
##########
@@ -31,9 +31,11 @@
     // Starting in version 8, authorized operations can be requested for cluster and topic resource.
     //
     // Version 9 is the first flexible version.
+    // Version 10 add topicId
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "ignorable": true,

Review comment:
       As discussed in the mailing thread and the KIP writeup, this should be "nullable"




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r543224093



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -46,6 +46,8 @@
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
     private final Map<Integer, Node> nodesById;
     private final ClusterResource clusterResource;
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;

Review comment:
       Do we have a use case that requires both maps to be stored in Cluster for fast lookups? Or would it be sufficient to store it one-way and look up the single map for the other way?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -372,6 +380,8 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                              topicIds: Map[String, Uuid],
+                              topicNames: Map[Uuid, String],

Review comment:
       It seems unnecessary to include both topicIds and topicNames in the constructor of this case class. We can add `val topicNames` inside this class that creates the second map from the first.

##########
File path: core/src/main/scala/kafka/log4j.properties
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Unspecified loggers and loggers with additivity=true output to server.log and stdout
+# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
+log4j.rootLogger=INFO, stdout, kafkaAppender

Review comment:
       Was this file checked in by mistake? Can we remove?

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -873,6 +883,11 @@ class ControllerChannelManagerTest {
       context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
       leaderIndex += 1
     }
+
+    context.allTopics ++= topics
+    for (topic <- topics if topicIds.contains(topic)) {

Review comment:
       Couldn't we just do this instead of the for loop:
   ```
    topicIds.foreach { case (name, id) => context.addTopicId(name, id) }
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       We seem to be adding to the topicIds/topicNames map, but never removing anything.

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -372,6 +380,8 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                              topicIds: Map[String, Uuid],
+                              topicNames: Map[Uuid, String],

Review comment:
       Also same question as for Cluster: Do we have a use case that requires both maps to be stored for fast lookups? Or would it be sufficient to store it one-way and look up the single map for the other way?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528407594



##########
File path: clients/src/main/resources/common/message/MetadataRequest.json
##########
@@ -31,9 +31,11 @@
     // Starting in version 8, authorized operations can be requested for cluster and topic resource.
     //
     // Version 9 is the first flexible version.
+    // Version 10 add topicId
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", "nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
+      { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": true, "about": "The topic id." },
+      { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", "ignorable": true,

Review comment:
       As discussed in the mailing thread and the KIP writeup, this should be "nullable" rather than ignorable




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r529122230



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +350,18 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {
+        return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+    }
+
+    public String getTopicName(Uuid topiId) {

Review comment:
       This is not used so far but will be used when we get TopicMetadata by topicId, so just add it in advance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r543331298



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       Hi, @rajinisivaram ,Thank you for your reply. I find that when KafkaController deletes a topic it will send `updateMetadataRequest` with all the partitions in it, and set `state.leader=-2` indicating the partition is deleted. And the `MetadataCache` will not remove the topic from cache but just keep the `UpdateMetadataPartitionState` of the deleted topics, so I also keep the topicId when deleted.
   
   here I tried to delete a topic and then describe it.
   
   ➜  ~ kafka-topics.sh --delete --zookeeper localhost:2181 --topic old-version-topic
   Topic old-version-topic is marked for deletion.
   Note: This will have no impact if delete.topic.enable is not set to true.
   
   ➜  ~ kafka-topics.sh --describe --zookeeper localhost:2181 --topic old-version-topic
   Topic: old-version-topic TopicId: wRPl6VAlQeyE77bDxEESzg  PartitionCount: 1	ReplicationFactor: 1 Configs: 	MarkedForDeletion: true
   	Topic: old-version-topic	Partition: 0	Leader: 0	Replicas: 0	Isr: 0	MarkedForDeletion: true
   
   So the topicId of the deleted topic should also remain when deleted, when do you think we should delete the topicId from MetadataCache? suggestions are welcomed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r550829179



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +346,14 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {

Review comment:
       Thank you for your reminder, I have added these 2 methods to the KIP page, take a look if you have time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#issuecomment-733441748


   > @dengziming Looks pretty good so far to me! I think it would be useful to write a few unit/integration tests to ensure the metadata snapshot behavior and describe topics work as expected.
   
   I added/altered some unit/integration tests in `MetadataRequestTest` and `ControllerChannelManagerTest`, take a look.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r546863711



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +346,14 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {

Review comment:
       Can we also add this and any other new public APIs to the KIP page? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528401568



##########
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##########
@@ -113,6 +114,7 @@ object TopicCommand extends Logging {
     def printDescription(): Unit = {
       val configsAsString = config.entries.asScala.filter(!_.isDefault).map { ce => s"${ce.name}=${ce.value}" }.mkString(",")
       print(s"Topic: $topic")
+      print(s"\tTopicId: ${if(topicId == Uuid.ZERO_UUID) "" else topicId}")

Review comment:
       Do we want to have a field TopicId that is empty? Would it make sense to include the Zero Uuid or not print a topic ID label at all?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r528830838



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +350,18 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {
+        return topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+    }
+
+    public String getTopicName(Uuid topiId) {

Review comment:
       I see...maybe I missed it, but I did not see the method used. What use case would you expect for topic names in Cluster?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544977551



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +346,14 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {

Review comment:
       nit: We don't use `get` prefix in other methods. Since this is part of the public API, can we change this to `topicId(String topic)` before merging?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544782830



##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       Thank you for your suggestions, I now understand the solution. I remove the topicId in `removePartitionInfo`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544988459



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -327,6 +346,14 @@ public Node controller() {
         return controller;
     }
 
+    public Collection<Uuid> topicIds() {
+        return topicIds.values();
+    }
+
+    public Uuid getTopicId(String topic) {

Review comment:
       Thank you for your reminder, it's helpful to me.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org