You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/10 16:02:24 UTC

[GitHub] [kafka] dajac commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

dajac commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540218952



##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -21,8 +21,12 @@
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
-  // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-4",
+  // Version 3 adds AddingReplicas and RemovingReplicas.
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 adds Topic ID to the TopicStates.

Review comment:
       `Type` field is also added. I would also mention the KIP for reference.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.

Review comment:
       nit: There are two consecutive `and`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -31,6 +35,8 @@
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
       "about": "The current broker epoch." },
+    { "name":  "Type", "type":  "int8", "versions": "5+",

Review comment:
       nit: There are two spaces before `Type` and `int8`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",

Review comment:
       nit: There are two spaces before `Topics` and `[]LeaderAndIsrTopicError`. We could also add a space before `name` to remain consistent with the other fields.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();

Review comment:
       nit: Could we directly allocate the `ArrayList` with the correct capacity? The same for `partitions` above and below.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();
+        for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+            LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
+            topicError.setTopicId(topicIds().get(topicState.topicName()));

Review comment:
       `topicIds()` recomputes the `Map` so it would be better to keep a local reference to it.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -116,8 +118,13 @@ public void testVersionLogic() {
                 new Node(0, "host0", 9090),
                 new Node(1, "host1", 9091)
             );
+
+            HashMap<String, Uuid> topicIds = new HashMap<>();

Review comment:
       nit: `HashMap<String, Uuid>` to `Map<String, Uuid>`. I have seen this in a couple of places.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -45,8 +47,16 @@ public LeaderAndIsrResponse(Struct struct, short version) {
         this.data = new LeaderAndIsrResponseData(struct, version);
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (data.topics().isEmpty()) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());

Review comment:
       It would be better to explicitly handle the version here instead of relying on `topics()` to be empty or not. It is easier to reason about for the reader and it also makes the handling very explicit instead of being implicit.

##########
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##########
@@ -482,8 +483,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
           _.node(config.interBrokerListenerName)
         }
         val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+        val topicIds = leaderAndIsrPartitionStates.keys
+          .map(_.topic)
+          .toSet
+          .map((topic: String) => (topic, controllerContext.topicIds(topic)))

Review comment:
       nit: `(topic: String)` -> `topic`. We rarely specify the type in lambdas.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());
+
         LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
-                15, 20, 0, partitionStates, Collections.emptySet()).build();
+                15, 20, 0, partitionStates, topicIds, Collections.emptySet()).build();
         LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
         assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), response.errorCounts());
     }
 
     @Test
     public void testErrorCountsWithTopLevelError() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+        Uuid id = Uuid.randomUuid();
+        List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
         LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
             .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
-            .setPartitionErrors(partitions));
+            .setTopics(topics));
         assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), response.errorCounts());

Review comment:
       Should we keep testing the older version as well? Tests assume the newest version only now.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
     assertEquals(1, updateMetadataRequests.size)
 
     val leaderAndIsrRequest = leaderAndIsrRequests.head
+    val topicIds = leaderAndIsrRequest.topicIds();
+    val topicNames = topicIds.asScala.map{ case (k,v) => (v, k)}

Review comment:
       nit: We usually put a space before `{` and `}` when we use curly braces inline. We could also add a space after the coma.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
     val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head
     assertEquals(2, brokerId)
     assertEquals(partitions.keySet,
-      leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName, p.partitionIndex)).toSet)
+      leaderAndIsrResponse.topics.asScala.map(t => t.partitionErrors.asScala.map(p =>
+        new TopicPartition(topicNames.get(t.topicId).get, p.partitionIndex))).flatMap(f => f).toSet)

Review comment:
       * `topicNames.get(t.topicId).get` -> `topicNames(t.topicId)`. It is a bit more concise when you know that the Map contains what you are looking up.
   * `flatMap(f => f)` looks weird. I suppose that we could use `flatMap` instead of the first `map`.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -818,15 +825,18 @@ class ControllerChannelManagerTest {
   private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
     sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest =>
       val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
-      val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p =>
-        new LeaderAndIsrPartitionError()
-          .setTopicName(p.topicName)
-          .setPartitionIndex(p.partitionIndex)
-          .setErrorCode(error.code))
+      val topicIds = leaderAndIsrRequest.topicIds()
+      val topicErrors = leaderAndIsrRequest.data.topicStates().asScala.map(t =>

Review comment:
       nit: The parenthesis after `topicStates`, `partitionStates`, and `partitionIndex` are not mandatory. We tend to not put it when they are not.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",
+      "about": "Each topic", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" },
+      { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",

Review comment:
       Should the version of `PartitionErrors` be `5+`? 

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -51,15 +53,15 @@
     public void testUnsupportedVersion() {
         LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
                 (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
-                Collections.emptyList(), Collections.emptySet());
+                Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
         assertThrows(UnsupportedVersionException.class, builder::build);
     }
 
     @Test
     public void testGetErrorResponse() {
         for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
             LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
-                    Collections.emptyList(), Collections.emptySet());
+                    Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());

Review comment:
       Shouldn't we verify that topic ids are correctly set in the generated response as well?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)
             // Minor optimization since the top-level error applies to all partitions
-            return Collections.singletonMap(error, data.partitionErrors().size());
-        return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+            if (data.topics().isEmpty()) {
+                return Collections.singletonMap(error, data.partitionErrors().size());
+            } else {
+                return Collections.singletonMap(error,
+                        data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum());
+            }
+        if (data.topics().isEmpty()) {
+            return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+        }
+        return errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l ->
+                Errors.forCode(l.errorCode())));

Review comment:
       ditto here. It would be better to be explicit wrt. the handling of the version.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.IsolationLevel;

Review comment:
       It would be good to verify that all versions are tested in `testSerialization`.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());

Review comment:
       nit: You could use `Collections.singletonMap` here.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV1) 4
+        if (apiVersion >= KAFKA_2_8_IV0) 5

Review comment:
       I wonder if we should extend `testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion` to verifies the topic ids based on the different supported versions. What do you think?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Do we need to handle the case when the topic may not be there anymore when the response is received? If not, we could use `controllerContext.topicNames(topic.topicId)`.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       It may be better to also have an explicit handling of the version here. Alternatively, we could push this into the `LeaderAndIsrResponse` and provides a method `Map<TopicPartition, ...> partitions()` which handles the version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org