You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/15 10:29:21 UTC

[GitHub] [kafka] rajinisivaram commented on a change in pull request #9622: KAFKA-10547; add topicId in MetadataResp

rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r543224093



##########
File path: clients/src/main/java/org/apache/kafka/common/Cluster.java
##########
@@ -46,6 +46,8 @@
     private final Map<Integer, List<PartitionInfo>> partitionsByNode;
     private final Map<Integer, Node> nodesById;
     private final ClusterResource clusterResource;
+    private final Map<String, Uuid> topicIds;
+    private final Map<Uuid, String> topicNames;

Review comment:
       Do we have a use case that requires both maps to be stored in Cluster for fast lookups? Or would it be sufficient to store it one-way and look up the single map for the other way?

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -372,6 +380,8 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                              topicIds: Map[String, Uuid],
+                              topicNames: Map[Uuid, String],

Review comment:
       It seems unnecessary to include both topicIds and topicNames in the constructor of this case class. We can add `val topicNames` inside this class that creates the second map from the first.

##########
File path: core/src/main/scala/kafka/log4j.properties
##########
@@ -0,0 +1,91 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Unspecified loggers and loggers with additivity=true output to server.log and stdout
+# Note that INFO only applies to unspecified loggers, the log level of the child logger is used otherwise
+log4j.rootLogger=INFO, stdout, kafkaAppender

Review comment:
       Was this file checked in by mistake? Can we remove?

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -873,6 +883,11 @@ class ControllerChannelManagerTest {
       context.updatePartitionFullReplicaAssignment(partition, ReplicaAssignment(replicas))
       leaderIndex += 1
     }
+
+    context.allTopics ++= topics
+    for (topic <- topics if topicIds.contains(topic)) {

Review comment:
       Couldn't we just do this instead of the for loop:
   ```
    topicIds.foreach { case (name, id) => context.addTopicId(name, id) }
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       We seem to be adding to the topicIds/topicNames map, but never removing anything.

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -372,6 +380,8 @@ class MetadataCache(brokerId: Int) extends Logging {
   }
 
   case class MetadataSnapshot(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]],
+                              topicIds: Map[String, Uuid],
+                              topicNames: Map[Uuid, String],

Review comment:
       Also same question as for Cluster: Do we have a use case that requires both maps to be stored for fast lookups? Or would it be sufficient to store it one-way and look up the single map for the other way?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org