You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/19 23:48:54 UTC

[GitHub] [kafka] jolshan opened a new pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

jolshan opened a new pull request #9626:
URL: https://github.com/apache/kafka/pull/9626


   This change takes the topic IDs created in https://github.com/apache/kafka/pull/9473 and propagates them to brokers using LeaderAndIsr Request. It also removes the topic name from the LeaderAndIsr Response, reorganizes the response to be sorted by topic, and includes the topic ID.
   
   In addition, the topic ID is persisted to each replica in Log as well as in a file on disk. This file is read on startup and if the topic ID exists, it will be reloaded. 
   
   This PR bumps the IBP and is expected to be merged at the same time as https://github.com/apache/kafka/pull/9622 as to not bump the protocol twice
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540513762



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       Yeah. One option I thought of would be to do something like partitions(Map<id, string> topicNames) and handle it inside the response with the version. That might be a little cleaner but I'm not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540218952



##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -21,8 +21,12 @@
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
-  // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-4",
+  // Version 3 adds AddingReplicas and RemovingReplicas.
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 adds Topic ID to the TopicStates.

Review comment:
       `Type` field is also added. I would also mention the KIP for reference.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.

Review comment:
       nit: There are two consecutive `and`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -31,6 +35,8 @@
       "about": "The current controller epoch." },
     { "name": "BrokerEpoch", "type": "int64", "versions": "2+", "ignorable": true, "default": "-1",
       "about": "The current broker epoch." },
+    { "name":  "Type", "type":  "int8", "versions": "5+",

Review comment:
       nit: There are two spaces before `Type` and `int8`.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",

Review comment:
       nit: There are two spaces before `Topics` and `[]LeaderAndIsrTopicError`. We could also add a space before `name` to remain consistent with the other fields.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();

Review comment:
       nit: Could we directly allocate the `ArrayList` with the correct capacity? The same for `partitions` above and below.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();
+        for (LeaderAndIsrTopicState topicState : data.topicStates()) {
+            LeaderAndIsrTopicError topicError = new LeaderAndIsrTopicError();
+            topicError.setTopicId(topicIds().get(topicState.topicName()));

Review comment:
       `topicIds()` recomputes the `Map` so it would be better to keep a local reference to it.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -116,8 +118,13 @@ public void testVersionLogic() {
                 new Node(0, "host0", 9090),
                 new Node(1, "host1", 9091)
             );
+
+            HashMap<String, Uuid> topicIds = new HashMap<>();

Review comment:
       nit: `HashMap<String, Uuid>` to `Map<String, Uuid>`. I have seen this in a couple of places.

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -45,8 +47,16 @@ public LeaderAndIsrResponse(Struct struct, short version) {
         this.data = new LeaderAndIsrResponseData(struct, version);
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (data.topics().isEmpty()) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());

Review comment:
       It would be better to explicitly handle the version here instead of relying on `topics()` to be empty or not. It is easier to reason about for the reader and it also makes the handling very explicit instead of being implicit.

##########
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##########
@@ -482,8 +483,13 @@ abstract class AbstractControllerBrokerRequestBatch(config: KafkaConfig,
           _.node(config.interBrokerListenerName)
         }
         val brokerEpoch = controllerContext.liveBrokerIdAndEpochs(broker)
+        val topicIds = leaderAndIsrPartitionStates.keys
+          .map(_.topic)
+          .toSet
+          .map((topic: String) => (topic, controllerContext.topicIds(topic)))

Review comment:
       nit: `(topic: String)` -> `topic`. We rarely specify the type in lambdas.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());
+
         LeaderAndIsrRequest request = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion(),
-                15, 20, 0, partitionStates, Collections.emptySet()).build();
+                15, 20, 0, partitionStates, topicIds, Collections.emptySet()).build();
         LeaderAndIsrResponse response = request.getErrorResponse(0, Errors.CLUSTER_AUTHORIZATION_FAILED.exception());
         assertEquals(Collections.singletonMap(Errors.CLUSTER_AUTHORIZATION_FAILED, 2), response.errorCounts());
     }
 
     @Test
     public void testErrorCountsWithTopLevelError() {
-        List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
+        Uuid id = Uuid.randomUuid();
+        List<LeaderAndIsrTopicError> topics = createTopic(id, asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
         LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
             .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
-            .setPartitionErrors(partitions));
+            .setTopics(topics));
         assertEquals(Collections.singletonMap(Errors.UNKNOWN_SERVER_ERROR, 2), response.errorCounts());

Review comment:
       Should we keep testing the older version as well? Tests assume the newest version only now.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -72,6 +73,8 @@ class ControllerChannelManagerTest {
     assertEquals(1, updateMetadataRequests.size)
 
     val leaderAndIsrRequest = leaderAndIsrRequests.head
+    val topicIds = leaderAndIsrRequest.topicIds();
+    val topicNames = topicIds.asScala.map{ case (k,v) => (v, k)}

Review comment:
       nit: We usually put a space before `{` and `}` when we use curly braces inline. We could also add a space after the coma.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
     val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head
     assertEquals(2, brokerId)
     assertEquals(partitions.keySet,
-      leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName, p.partitionIndex)).toSet)
+      leaderAndIsrResponse.topics.asScala.map(t => t.partitionErrors.asScala.map(p =>
+        new TopicPartition(topicNames.get(t.topicId).get, p.partitionIndex))).flatMap(f => f).toSet)

Review comment:
       * `topicNames.get(t.topicId).get` -> `topicNames(t.topicId)`. It is a bit more concise when you know that the Map contains what you are looking up.
   * `flatMap(f => f)` looks weird. I suppose that we could use `flatMap` instead of the first `map`.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -818,15 +825,18 @@ class ControllerChannelManagerTest {
   private def applyLeaderAndIsrResponseCallbacks(error: Errors, sentRequests: List[SentRequest]): Unit = {
     sentRequests.filter(_.request.apiKey == ApiKeys.LEADER_AND_ISR).filter(_.responseCallback != null).foreach { sentRequest =>
       val leaderAndIsrRequest = sentRequest.request.build().asInstanceOf[LeaderAndIsrRequest]
-      val partitionErrors = leaderAndIsrRequest.partitionStates.asScala.map(p =>
-        new LeaderAndIsrPartitionError()
-          .setTopicName(p.topicName)
-          .setPartitionIndex(p.partitionIndex)
-          .setErrorCode(error.code))
+      val topicIds = leaderAndIsrRequest.topicIds()
+      val topicErrors = leaderAndIsrRequest.data.topicStates().asScala.map(t =>

Review comment:
       nit: The parenthesis after `topicStates`, `partitionStates`, and `partitionIndex` are not mandatory. We tend to not put it when they are not.

##########
File path: clients/src/main/resources/common/message/LeaderAndIsrResponse.json
##########
@@ -22,15 +22,28 @@
   // Version 2 is the same as version 1.
   //
   // Version 3 is the same as version 2.
-  "validVersions": "0-4",
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 removes TopicName and replaces it with TopicId and and reorganizes the partitions by topic.
+  "validVersions": "0-5",
   "flexibleVersions": "4+",
   "fields": [
     { "name": "ErrorCode", "type": "int16", "versions": "0+",
       "about": "The error code, or 0 if there was no error." },
-    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",
-      "about": "Each partition.", "fields": [
-      { "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
-        "about": "The topic name." },
+    { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0-4",
+      "about": "Each partition in v0 to v4 message."},
+    {"name":  "Topics", "type":  "[]LeaderAndIsrTopicError", "versions": "5+",
+      "about": "Each topic", "fields": [
+      { "name": "TopicId", "type": "uuid", "versions": "5+", "about": "The unique topic ID" },
+      { "name": "PartitionErrors", "type": "[]LeaderAndIsrPartitionError", "versions": "0+",

Review comment:
       Should the version of `PartitionErrors` be `5+`? 

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrRequestTest.java
##########
@@ -51,15 +53,15 @@
     public void testUnsupportedVersion() {
         LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(
                 (short) (LEADER_AND_ISR.latestVersion() + 1), 0, 0, 0,
-                Collections.emptyList(), Collections.emptySet());
+                Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());
         assertThrows(UnsupportedVersionException.class, builder::build);
     }
 
     @Test
     public void testGetErrorResponse() {
         for (short version = LEADER_AND_ISR.oldestVersion(); version < LEADER_AND_ISR.latestVersion(); version++) {
             LeaderAndIsrRequest.Builder builder = new LeaderAndIsrRequest.Builder(version, 0, 0, 0,
-                    Collections.emptyList(), Collections.emptySet());
+                    Collections.emptyList(), Collections.emptyMap(), Collections.emptySet());

Review comment:
       Shouldn't we verify that topic ids are correctly set in the generated response as well?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)
             // Minor optimization since the top-level error applies to all partitions
-            return Collections.singletonMap(error, data.partitionErrors().size());
-        return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+            if (data.topics().isEmpty()) {
+                return Collections.singletonMap(error, data.partitionErrors().size());
+            } else {
+                return Collections.singletonMap(error,
+                        data.topics().stream().mapToInt(t -> t.partitionErrors().size()).sum());
+            }
+        if (data.topics().isEmpty()) {
+            return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+        }
+        return errorCounts(data.topics().stream().flatMap(t -> t.partitionErrors().stream()).map(l ->
+                Errors.forCode(l.errorCode())));

Review comment:
       ditto here. It would be better to be explicit wrt. the handling of the version.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -21,6 +21,7 @@
 import org.apache.kafka.common.IsolationLevel;

Review comment:
       It would be good to verify that all versions are tested in `testSerialization`.

##########
File path: clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
##########
@@ -57,29 +60,32 @@ public void testErrorCountsFromGetErrorResponse() {
             .setZkVersion(20)
             .setReplicas(Collections.singletonList(10))
             .setIsNew(false));
+        HashMap<String, Uuid> topicIds = new HashMap<>();
+        topicIds.put("foo", Uuid.randomUuid());

Review comment:
       nit: You could use `Collections.singletonMap` here.

##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV1) 4
+        if (apiVersion >= KAFKA_2_8_IV0) 5

Review comment:
       I wonder if we should extend `testLeaderAndIsrRequestFollowsInterBrokerProtocolVersion` to verifies the topic ids based on the different supported versions. What do you think?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Do we need to handle the case when the topic may not be there anymore when the response is received? If not, we could use `controllerContext.topicNames(topic.topicId)`.

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       It may be better to also have an explicit handling of the version here. Alternatively, we could push this into the `LeaderAndIsrResponse` and provides a method `Map<TopicPartition, ...> partitions()` which handles the version.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543485900



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get
+      topic.partitionErrors().forEach { partition =>
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp

Review comment:
       I ended up moving some of this to the response class so the duplicate logic is removed. I just haven't pushed yet
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rite2nikhil commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rite2nikhil commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r539900024



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)
             // Minor optimization since the top-level error applies to all partitions
-            return Collections.singletonMap(error, data.partitionErrors().size());
-        return errorCounts(data.partitionErrors().stream().map(l -> Errors.forCode(l.errorCode())));
+            if (data.topics().isEmpty()) {
+                return Collections.singletonMap(error, data.partitionErrors().size());
+            } else {

Review comment:
       nit: is the else statement needed all it has is return?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543488660



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {
+            val partitionMetadata = partitionMetadataFile.get.read()
+            initializePartitionMetadata()
+            partitionMetadataFile.get.write(partitionMetadata.topicId)

Review comment:
       Perhaps my understading is wrong, but if we rename the directory, do we need to create the file in the new directory? With the same topic ID and such as before.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram merged pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rajinisivaram merged pull request #9626:
URL: https://github.com/apache/kafka/pull/9626


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540311108



##########
File path: clients/src/main/resources/common/message/LeaderAndIsrRequest.json
##########
@@ -21,8 +21,12 @@
   //
   // Version 2 adds broker epoch and reorganizes the partitions by topic.
   //
-  // Version 3 adds AddingReplicas and RemovingReplicas
-  "validVersions": "0-4",
+  // Version 3 adds AddingReplicas and RemovingReplicas.
+  //
+  // Version 4 is the first flexible version.
+  //
+  // Version 5 adds Topic ID to the TopicStates.

Review comment:
       Thanks for catching that. Will add the KIP too.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dajac commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
dajac commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540322639



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       Actually, this may not work as we don't have the topic name in the latest version...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r545009813



##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -2176,4 +2212,84 @@ class ReplicaManagerTest {
       replicaManager.shutdown(false)
     }
   }
+
+  @Test
+  def testPartitionMetadataFile() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.randomUuid())
+
+      def leaderAndIsrRequest(epoch: Int): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(topic)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
+      assertTrue(!replicaManager.localLog(topicPartition).isEmpty)
+      val id = topicIds.get(topicPartition.topic())
+      val log = replicaManager.localLog(topicPartition).get
+      assertTrue(!log.partitionMetadataFile.isEmpty)
+      assertTrue(!log.partitionMetadataFile.get.isEmpty())
+      val partitionMetadata = log.partitionMetadataFile.get.read()
+
+      // Current version of PartitionMetadataFile is 0.
+      assertEquals(0, partitionMetadata.version)
+      assertEquals(id, partitionMetadata.topicId)
+    } finally replicaManager.shutdown(checkpointHW = false)
+  }
+
+  @Test
+  def testPartitionMetadataFileNotCreated() = {
+    val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
+    try {
+      val brokerList = Seq[Integer](0, 1).asJava
+      val topicPartition = new TopicPartition(topic, 0)
+      replicaManager.createPartition(topicPartition)
+        .createLogIfNotExists(isNew = false, isFutureReplica = false,
+          new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints))
+      val topicIds = Collections.singletonMap(topic, Uuid.ZERO_UUID)
+
+      def leaderAndIsrRequest(epoch: Int, name: String): LeaderAndIsrRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
+        Seq(new LeaderAndIsrPartitionState()
+          .setTopicName(name)
+          .setPartitionIndex(0)
+          .setControllerEpoch(0)
+          .setLeader(0)
+          .setLeaderEpoch(epoch)
+          .setIsr(brokerList)
+          .setZkVersion(0)
+          .setReplicas(brokerList)
+          .setIsNew(true)).asJava,
+        topicIds,
+        Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
+
+      // The file has no contents if the topic does not have an associated topic ID.
+      replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0, "fakeTopic"), (_, _) => ())
+      assertTrue(!replicaManager.localLog(topicPartition).isEmpty)

Review comment:
       nit: `assertFalse` instead of `assertTrue(!...)`. There are a few of these below as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540516535



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {

Review comment:
       There is LeaderAndIsrResponseTest.java (which doesn't try all versions) and testGetErrorResponse() in LeaderAndIsrRequestTest.java that does test all versions. I will add version tests to the former.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544634608



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       So I thought through these cases some more and realized that the metadata file will fail to open if formatted incorrectly. So the only case where there could be data written to the file is if the ID is the zero UUID. So I decided to just fail on reading the file if the zero ID is provided. (We will never write zero ID to file.) The rest of this cleaned up pretty nicely.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543691182



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       I moved this logic to LeaderAndIsrResponse which is java code, so it behaves a bit differently. (We may want to move back though, and if so, I'll do this.) For now I do a null check in the java code. 
   
   One question I have though is what does it mean to not have a topic in the context? Is it possible to send a leaderAndIsrRequest with an unknown partition/one not in the context? I can only think maybe the topic gets deleted while handling the request. What would the normal error handling case be in that scenario?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540354679



##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -157,7 +163,8 @@ class ControllerChannelManagerTest {
 
     for (apiVersion <- ApiVersion.allVersions) {
       val leaderAndIsrRequestVersion: Short =
-        if (apiVersion >= KAFKA_2_4_IV1) 4
+        if (apiVersion >= KAFKA_2_8_IV0) 5

Review comment:
       That's a good idea




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540354106



##########
File path: core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala
##########
@@ -87,7 +90,10 @@ class ControllerChannelManagerTest {
     val LeaderAndIsrResponseReceived(leaderAndIsrResponse, brokerId) = batch.sentEvents.head
     assertEquals(2, brokerId)
     assertEquals(partitions.keySet,
-      leaderAndIsrResponse.partitions.asScala.map(p => new TopicPartition(p.topicName, p.partitionIndex)).toSet)
+      leaderAndIsrResponse.topics.asScala.map(t => t.partitionErrors.asScala.map(p =>
+        new TopicPartition(topicNames.get(t.topicId).get, p.partitionIndex))).flatMap(f => f).toSet)

Review comment:
       I thought I removed most of the get(topic).gets, but I guess I missed a few. Thanks for catching. And I agree about flatMap as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#issuecomment-746586891


   @rajinisivaram Yup, will look at the rebase next.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544483664



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       I might think about making this code cleaner in general to avoid so many nested if statements




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544483238



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       Oops. I think I cleaned up this block and deleted something. There should be a check if log.topicId.equals(id). If so, then the file exists and we shouldn't go in to the block that says "// There is not yet a topic ID stored in the log."
   
   I should also fix the topicIds.get(topicPartition.topic) above and replace with id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rite2nikhil commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rite2nikhil commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r539903721



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {

Review comment:
       may be i missed is there a test checking correctness of versioning ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540577792



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -138,14 +145,32 @@ public LeaderAndIsrResponse getErrorResponse(int throttleTimeMs, Throwable e) {
         Errors error = Errors.forException(e);
         responseData.setErrorCode(error.code());
 
-        List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
-        for (LeaderAndIsrPartitionState partition : partitionStates()) {
-            partitions.add(new LeaderAndIsrPartitionError()
-                .setTopicName(partition.topicName())
-                .setPartitionIndex(partition.partitionIndex())
-                .setErrorCode(error.code()));
+        if (version() < 5) {
+            List<LeaderAndIsrPartitionError> partitions = new ArrayList<>();
+            for (LeaderAndIsrPartitionState partition : partitionStates()) {
+                partitions.add(new LeaderAndIsrPartitionError()
+                        .setTopicName(partition.topicName())
+                        .setPartitionIndex(partition.partitionIndex())
+                        .setErrorCode(error.code()));
+            }
+            responseData.setPartitionErrors(partitions);
+            return new LeaderAndIsrResponse(responseData);
+        }
+
+        List<LeaderAndIsrTopicError> topics = new ArrayList<>();

Review comment:
       So the partition above uses an iterable, and I'm not sure if there is a way to grab the size without iterating through. Let me know if I'm forgetting something. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543487703



##########
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val LeaderEpochCheckpointFilename = "partition.metadata"

Review comment:
       Good catch. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540537367



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Oops. Looks like I missed another one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543679600



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {
+            val partitionMetadata = partitionMetadataFile.get.read()
+            initializePartitionMetadata()
+            partitionMetadataFile.get.write(partitionMetadata.topicId)

Review comment:
       Oh I did understand wrong. The file is already in place, so initializePartitionMetadata is all that is needed to get the correct file path.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543663481



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       I think that if we reach here, we are in an unexpected state. The partitionMetadata file should have loaded in the topic ID if the file already contained content. I left as a warn rather than an exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r527812774



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 4) {

Review comment:
       Good catch. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#issuecomment-748345741


   @jolshan Thanks for the PR, the last build was good, so merging to trunk


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
dengziming commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r527643380



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 4) {

Review comment:
       Should here be version() < 5 ?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 4) {
+            val responsePartitions = responseMap.iterator.map { case (tp, error) =>
+              new LeaderAndIsrPartitionError()
+                .setTopicName(tp.topic)
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code)
+            }.toBuffer
+            new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+              .setErrorCode(Errors.NONE.code)
+              .setPartitionErrors(responsePartitions.asJava))
+          } else {
+            val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]]
+            responseMap.asJava.forEach { case (tp, error) =>
+              if (topics.get(tp.topic) == None) {

Review comment:
       can be changed into topics.contains()




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r543243916



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrRequest.java
##########
@@ -171,6 +196,14 @@ public long brokerEpoch() {
         return data.ungroupedPartitionStates();
     }
 
+    public Map<String, Uuid> topicIds() {
+        Map<String, Uuid> topicIds = new HashMap<>();
+        for (LeaderAndIsrTopicState ts : data.topicStates()) {
+            topicIds.put(ts.topicName(), ts.topicId());
+        }
+        return topicIds;

Review comment:
       Should be able to replace this with something like:
   ```
   return data.topicStates().stream()
     .collect(Collectors.toMap(LeaderAndIsrTopicState::topicName, LeaderAndIsrTopicState::topicId));
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)

Review comment:
       nit: We should add braces here since there are multiple lines inside the `if` statement

##########
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val LeaderEpochCheckpointFilename = "partition.metadata"
+  private val WhiteSpacesPattern = Pattern.compile(":\\s+")
+  private val CurrentVersion = 0
+
+  def newFile(dir: File): File = new File(dir, LeaderEpochCheckpointFilename)
+
+  object PartitionMetadataFileFormatter {
+    def toFile(data: PartitionMetadata): String = {
+      s"version: ${data.version}\ntopic_id: ${data.topicId}"
+    }
+
+  }
+
+  class PartitionMetadataReadBuffer[T](location: String,
+                                       reader: BufferedReader,
+                                       version: Int) extends Logging {
+    def read(): PartitionMetadata = {
+      def malformedLineException(line: String) =
+        new IOException(s"Malformed line in checkpoint file ($location): '$line'")
+
+      var line: String = null
+      var metadataTopicId: Uuid = null
+      try {
+        line = reader.readLine()
+        WhiteSpacesPattern.split(line) match {
+          case Array(_, version) =>
+            if (version.toInt == CurrentVersion) {
+              line = reader.readLine()
+              WhiteSpacesPattern.split(line) match {
+                case Array(_, topicId) => metadataTopicId = Uuid.fromString(topicId)
+                case _ => throw malformedLineException(line)
+              }
+              new PartitionMetadata(CurrentVersion, metadataTopicId)
+            } else {
+              throw new IOException(s"Unrecognized version of the checkpoint file ($location): " + version)

Review comment:
       update error message?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       why is this a warning?

##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -58,8 +68,17 @@ public Errors error() {
         Errors error = error();
         if (error != Errors.NONE)

Review comment:
       nit: We should add braces here since there are multiple lines inside the `if` statement

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get

Review comment:
       Should we do `controllerContext.topicNames.get(topic.topicId).foreach` instead of get to avoid throwing exception if topic is not in the context?

##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {
+      leaderAndIsrResponse.partitions.forEach { partition =>
+        val topicName = partition.topicName
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp
+      }
+    }
+
+    leaderAndIsrResponse.topics.forEach { topic =>
+      val topicName = controllerContext.topicNames.get(topic.topicId).get
+      topic.partitionErrors().forEach { partition =>
+        val tp = new TopicPartition(topicName, partition.partitionIndex)
+        if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
+          offlineReplicas += tp
+        else if (partition.errorCode == Errors.NONE.code)
+          onlineReplicas += tp

Review comment:
       We can make an inner method within this method with the common logic.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -322,6 +327,11 @@ class Log(@volatile private var _dir: File,
     // deletion.
     producerStateManager.removeStraySnapshots(segments.values().asScala.map(_.baseOffset).toSeq)
     loadProducerState(logEndOffset, reloadFromCleanShutdown = hadCleanShutdown)
+
+    // Recover topic ID if present
+    if (!partitionMetadataFile.get.isEmpty()) {
+      topicId = partitionMetadataFile.get.read().topicId
+    }

Review comment:
       Seems neater to use `partitionMetadataFile.foreach` instead of using `.get` twice.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {

Review comment:
       `.foreach` may be better than lots of .`get`s

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1021,6 +1036,13 @@ class Log(@volatile private var _dir: File,
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
           initializeLeaderEpochCache()
+          if (!partitionMetadataFile.isEmpty && !partitionMetadataFile.get.isEmpty()) {
+            val partitionMetadata = partitionMetadataFile.get.read()
+            initializePartitionMetadata()
+            partitionMetadataFile.get.write(partitionMetadata.topicId)

Review comment:
       Why are we reading and writing again?

##########
File path: core/src/main/scala/kafka/server/PartitionMetadataFile.scala
##########
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.{BufferedReader, BufferedWriter, File, FileOutputStream, IOException, OutputStreamWriter}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{FileAlreadyExistsException, Files, Paths}
+import java.util.regex.Pattern
+
+import kafka.utils.Logging
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.utils.Utils
+
+
+
+object PartitionMetadataFile {
+  private val LeaderEpochCheckpointFilename = "partition.metadata"

Review comment:
       `LeaderEpochCheckpointFilename` => `PartitionMetadataFilename`?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)

Review comment:
       As above, we can use `id`

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))

Review comment:
       we already have the topic id in `id`.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1445,15 +1483,38 @@ class ReplicaManager(val config: KafkaConfig,
           replicaFetcherManager.shutdownIdleFetcherThreads()
           replicaAlterLogDirsManager.shutdownIdleFetcherThreads()
           onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
-          val responsePartitions = responseMap.iterator.map { case (tp, error) =>
-            new LeaderAndIsrPartitionError()
-              .setTopicName(tp.topic)
-              .setPartitionIndex(tp.partition)
-              .setErrorCode(error.code)
-          }.toBuffer
-          new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
-            .setErrorCode(Errors.NONE.code)
-            .setPartitionErrors(responsePartitions.asJava))
+          if (leaderAndIsrRequest.version() < 5) {
+            val responsePartitions = responseMap.iterator.map { case (tp, error) =>
+              new LeaderAndIsrPartitionError()
+                .setTopicName(tp.topic)
+                .setPartitionIndex(tp.partition)
+                .setErrorCode(error.code)
+            }.toBuffer
+            new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
+              .setErrorCode(Errors.NONE.code)
+              .setPartitionErrors(responsePartitions.asJava))
+          } else {
+            val topics = new mutable.HashMap[String, List[LeaderAndIsrPartitionError]]
+            responseMap.asJava.forEach { case (tp, error) =>
+              if (!topics.contains(tp.topic)) {
+                topics.put(tp.topic, List(new LeaderAndIsrPartitionError()
+                                                                .setPartitionIndex(tp.partition)
+                                                                .setErrorCode(error.code)))
+              } else {
+                topics.put(tp.topic, new LeaderAndIsrPartitionError()
+                  .setPartitionIndex(tp.partition)
+                  .setErrorCode(error.code)::topics.get(tp.topic).get)

Review comment:
       Replace `topics.get(tp.topic).get` with `topics(tp.topic)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540513762



##########
File path: core/src/main/scala/kafka/controller/KafkaController.scala
##########
@@ -1378,12 +1378,26 @@ class KafkaController(val config: KafkaConfig,
     val offlineReplicas = new ArrayBuffer[TopicPartition]()
     val onlineReplicas = new ArrayBuffer[TopicPartition]()
 
-    leaderAndIsrResponse.partitions.forEach { partition =>
-      val tp = new TopicPartition(partition.topicName, partition.partitionIndex)
-      if (partition.errorCode == Errors.KAFKA_STORAGE_ERROR.code)
-        offlineReplicas += tp
-      else if (partition.errorCode == Errors.NONE.code)
-        onlineReplicas += tp
+    if (leaderAndIsrResponse.topics().isEmpty) {

Review comment:
       Yeah. One option I thought of would be to do something like partitions(Map<id, string>) and handle it inside the response with the version. That might be a little cleaner but I'm not sure.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540510301



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -45,8 +47,16 @@ public LeaderAndIsrResponse(Struct struct, short version) {
         this.data = new LeaderAndIsrResponseData(struct, version);
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (data.topics().isEmpty()) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());

Review comment:
       I'm thinking that it most cases where I create a LeaderAndIsrResponse with the data object, I have access to the request and can grab the version there. Then I can add it to the constructor.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r540309740



##########
File path: clients/src/main/java/org/apache/kafka/common/requests/LeaderAndIsrResponse.java
##########
@@ -45,8 +47,16 @@ public LeaderAndIsrResponse(Struct struct, short version) {
         this.data = new LeaderAndIsrResponseData(struct, version);
     }
 
-    public List<LeaderAndIsrPartitionError> partitions() {
-        return data.partitionErrors();
+    public List<LeaderAndIsrTopicError> topics() {
+        return this.data.topics();
+    }
+
+    public Iterable<LeaderAndIsrPartitionError> partitions() {
+        if (data.topics().isEmpty()) {
+            return data.partitionErrors();
+        }
+        return () -> new FlattenedIterator<>(data.topics().iterator(),
+            topic -> topic.partitionErrors().iterator());

Review comment:
       I agree. I think I had some problems with defining the version when the constructor only provides the data. Is there a way to get the version with just the data? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rajinisivaram commented on a change in pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
rajinisivaram commented on a change in pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#discussion_r544204490



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1434,6 +1447,31 @@ class ReplicaManager(val config: KafkaConfig,
            */
             if (localLog(topicPartition).isEmpty)
               markPartitionOffline(topicPartition)
+            else {
+              val id = topicIds.get(topicPartition.topic())
+              // Ensure we have not received a request from an older protocol
+              if (id != null && !id.equals(Uuid.ZERO_UUID)) {
+                val log = localLog(topicPartition).get
+                // Check if the topic ID is in memory, if not, it must be new to the broker.
+                // If the broker previously wrote it to file, it would be recovered on restart after failure.
+                // If the topic ID is not the default (ZERO_UUID), a topic ID is being used for the given topic.
+                // If the topic ID in the log does not match the one in the request, the broker's topic must be stale.
+                if (!log.topicId.equals(Uuid.ZERO_UUID) && !log.topicId.equals(topicIds.get(topicPartition.topic))) {
+                  stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                    s" match the topic Id provided in the request: " +
+                    s"${topicIds.get(topicPartition.topic).toString}.")
+                } else {
+                  // There is not yet a topic ID stored in the log.
+                  // Write the partition metadata file if it is empty.
+                  if (log.partitionMetadataFile.get.isEmpty()) {
+                    log.partitionMetadataFile.get.write(topicIds.get(topicPartition.topic))
+                    log.topicId = topicIds.get(topicPartition.topic)
+                  } else {
+                    stateChangeLogger.warn("Partition metadata file already contains content.")

Review comment:
       Hmm, looking at the conditional statements here, it looks like we would write the file the first time we get here because `log.partitionMetadataFile.get.isEmpty()` and the second time we would print a warning even if the id in the file matches the expected id. Unless I missed something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] dengziming commented on pull request #9626: KAFKA-10545: Create topic IDs and propagate to brokers

Posted by GitBox <gi...@apache.org>.
dengziming commented on pull request #9626:
URL: https://github.com/apache/kafka/pull/9626#issuecomment-738773845


   KAFKA-10729 add `KAFKA_2_8_IV0`, so you should bump the version again.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org