You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/08 23:15:00 UTC

[GitHub] [kafka] jolshan opened a new pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

jolshan opened a new pull request #10282:
URL: https://github.com/apache/kafka/pull/10282


   KIP-516 introduced partition.metadata file to persist the topic ID on the broker. It is created through handling the LeaderAndIsrRequest in ReplicaManager. (See https://github.com/apache/kafka/pull/10143 for the code path.) RaftReplicaManager was missing the analogue code path for Kip-500 code. Like in ReplicaManager, RaftReplicaManager will now check or create the partition.metadata file when handling metadata records.
   
   Tests added to ensure the file is created and that the correct error is thrown when the id is inconsistent.
   
   Also adds a few methods to get topic ID from MetadataImageBuilder as this is the most convenient way to get topic ID from RaftReplicaManager.
   I thought it made sense to add to the path that defers and the path that does not defer. Let me know if this assumption was not correct.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597863289



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,14 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
+      if (log.topicId == None && topicId.isDefined) {

Review comment:
       nit: we can do `topicId.foreach`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does not" +

Review comment:
       nit: can we be consistent about naming topicId? Here we use "Topic Id" while in the message below we use "Topic ID." I slightly prefer the more compact camel-case "topicId," but I don't feel strongly about it.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)

Review comment:
       You should be able to do something like `!topicId.contains(fileTopicId)`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does not" +
+              s" match the topic Id for partition $topicPartition received: " +
+              s"$receivedTopicId.")
+            throw new InconsistentTopicIdException(s"Topic partition $topicPartition had an inconsistent topic ID.")
+          }
+        })
+      case None => throw new IllegalStateException(

Review comment:
       nit: it looks a little weird that we indent here, but we don't for `InconsistentTopicIdException` above

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1473,6 +1484,30 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Checks if the topic ID provided in the LeaderAndIsr request is consistent with the topic ID in the log.
+   *
+   * If the request had an invalid topic ID (null or zero), then we assume that topic IDs are not supported.
+   * The topic ID was not inconsistent, so return true.
+   * If the log does not exist or the topic ID is not yet set, logTopicIdOpt will be None.
+   * In both cases, the ID is not inconsistent so return true.
+   *
+   * @param requestTopicId the topic ID from the LeaderAndIsr request
+   * @param logTopicIdOpt the topic ID in the log if the log and the topic ID exist
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  private def checkTopicId(requestTopicId: Uuid, logTopicIdOpt: Option[Uuid]): Boolean = {

Review comment:
       It will read a little easier if we name this `hasConsistentTopicId`.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       My feeling is that we should be able to get rid of `keepPartitionMetadataFile`. We should just create the file whenever a topicId is provided. Not sure if there is anything I'm missing. I don't think the file would cause any harm if we downgraded, would it?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,14 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
+      if (log.topicId == None && topicId.isDefined) {
+        log.partitionMetadataFile.write(topicId.get)
+        log.topicId = Some(topicId.get)

Review comment:
       We can create a method in `Log` to update the topicId which does both of these operations.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId : Option[Uuid] = None,

Review comment:
       Let's make this a mandatory parameter. Optional parameters are dangerous because they can be overlooked. As an example of the danger, the optional argument just above this one led to this bug: https://issues.apache.org/jira/browse/KAFKA-12504.
   
   Also nit: remove space before the colon.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1326,6 +1326,14 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
       val topicIds = leaderAndIsrRequest.topicIds()
+      def getTopicId (topicName: String): Option[Uuid] = {

Review comment:
       nit: remove space before parenthesis

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       I was looking into the uses of this and found that this one in `KafkaApis.handleDeleteTopicsRequest`. 
   
   ```scala
           if (!config.usesTopicId && topicIdsFromRequest.contains(topic.topicId)) {
             topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
             topic.setErrorMessage("Topic IDs are not supported on the server.")
   ```
   
   I'm having a little trouble understanding why this check is necessary. If we are not using topic ids, then it seems reasonable if we returned UNKNOWN_TOPIC_ID. 
   
   My concern is the following case. Say I am doing a roll of the cluster to upgrade the IBP to a version which supports topicIds. The controller may get upgraded first and begin creating topics with topicIds before the rest of the cluster has an upgraded IBP. Now if the user tries to delete the topic using the topicId, it could be sent to one of the brokers which does not have the bumped IBP. This would cause an UNSUPPORTED_VERSION error, which would be very surprising.
   
   If you agree this is a problem, we can open a separate issue to address it.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1473,6 +1484,30 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Checks if the topic ID provided in the LeaderAndIsr request is consistent with the topic ID in the log.
+   *
+   * If the request had an invalid topic ID (null or zero), then we assume that topic IDs are not supported.
+   * The topic ID was not inconsistent, so return true.
+   * If the log does not exist or the topic ID is not yet set, logTopicIdOpt will be None.
+   * In both cases, the ID is not inconsistent so return true.
+   *
+   * @param requestTopicId the topic ID from the LeaderAndIsr request
+   * @param logTopicIdOpt the topic ID in the log if the log and the topic ID exist
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  private def checkTopicId(requestTopicId: Uuid, logTopicIdOpt: Option[Uuid]): Boolean = {
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) {
+      true
+    } else
+      logTopicIdOpt match {

Review comment:
       You can use `logTopicIdOpt.contains(requestTopicId)`

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")

Review comment:
       Can you include the topic partition in this error message?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition _instead of_ or _in addition to_  Log. Not sure if we also want to change this for the ZK code. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r608243332



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       Ah I totally linked the wrong PR here. I meant my topic ID fetch PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598142353



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       We would rewrite in ZK, not the file. So we would always get an inconsistent topic ID.
   
   We can overwrite in ZK when we add partitions/change ISR since this information is stored in the ZNode along with topic ID. When we write to it using an older version, we can't keep the topic ID field and it is lost.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596251187



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       I suppose one option is to directly associate through Partition.createLogIfNotExists when we get or create the partition a few lines earlier. We call this in makeLeaders/makeFollowers as well, so it's no extra work.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#issuecomment-801292892


   @jolshan Yeah, we haven't gotten to implementing JBOD for KIP-500. It shouldn't be too far off though.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605142418



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       This is a good topic to decide on for the fetch pr




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598142353



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       We would rewrite in ZK, not the file. So we would always get an inconsistent topic ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_**  Log. Not sure if we also want to change this for the ZK code. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598141840



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       You are right that DeleteTopics must be handled by the active controller. I think the point still stands with a slightly different scenario. The controller might go from a new version with an updated IBP to an old version during the roll. This could happen because of a session timeout. It is an unlikely case to be sure, but still possible. I'd be inclined to remove the check just to take it out of the picture. It doesn't seem to be buying us anything.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#issuecomment-801278854


   > I cannot think of a reason not to pass the topic id if we have it available. Otherwise, we would need logic to set it again after the log dir is swapped.
   @hachikuji this method is only called for zk controllers so it should be able to set it. but I will add the topic ID.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r590577901



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
    * @return true if the request topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported so ID is consistent.
+    // This is only the case when from LeaderAndIsr Request. Raft code should never have invalid topic IDs.
+    if (receivedTopicId == null || receivedTopicId == Uuid.ZERO_UUID) {
+      if (usingRaft) false else true

Review comment:
       There are some scenarios during upgrades where we may have invalid topic IDs when interBrokerProtocolVersion >= KAFKA_2_8_IV0 for the receiving broker (ie, the controller may be a lower IBP)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_**  Log. Not sure if we also want to change this for the ZK code. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598141328



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       Not sure I follow the scenario. Why would we overwrite the file on re-upgrade and what would be the problem with that?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596377411



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       Hmmm. I guess that makes the log creation in makeLeaders/makeFollowers superfluous. Another option is to add the partition metadata file in another place when we know we have the log associated. We know the log gets associated in `partition.createLogIfNotExists` so maybe there? We check if we have an ID sent to this request + if we don't yet have a topic ID in the log. and assign there? Unfortunately, this is in the raft code path too, but it shouldn't be used since we always already assign topic ID.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597932374



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId : Option[Uuid] = None,

Review comment:
       Ah I missed this one. will do.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597999822



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1473,6 +1484,30 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Checks if the topic ID provided in the LeaderAndIsr request is consistent with the topic ID in the log.
+   *
+   * If the request had an invalid topic ID (null or zero), then we assume that topic IDs are not supported.
+   * The topic ID was not inconsistent, so return true.
+   * If the log does not exist or the topic ID is not yet set, logTopicIdOpt will be None.
+   * In both cases, the ID is not inconsistent so return true.
+   *
+   * @param requestTopicId the topic ID from the LeaderAndIsr request
+   * @param logTopicIdOpt the topic ID in the log if the log and the topic ID exist
+   * @return true if the request topic id is consistent, false otherwise
+   */
+  private def checkTopicId(requestTopicId: Uuid, logTopicIdOpt: Option[Uuid]): Boolean = {
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID) {
+      true
+    } else
+      logTopicIdOpt match {

Review comment:
       I've changed the match portion to `logTopicIdOpt.isEmpty || logTopicIdOpt.contains(requestTopicId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605134985



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId: Option[Uuid],
           val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup {

Review comment:
       One more thing here. Can we make this a non-optional parameter?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594553183



##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
##########
@@ -145,7 +145,7 @@ public void setup() {
         OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
         for (TopicPartition topicPartition : topicPartitions) {
             final Partition partition = this.replicaManager.createPartition(topicPartition);
-            partition.createLogIfNotExists(true, false, checkpoints);
+            partition.createLogIfNotExists(true, false, checkpoints, Option.empty());

Review comment:
       I think we can just not set the topic ID here, but want to confirm.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method and the `endMetadataChangeDeferral` method.  Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible)
   
   One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597910287



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       Yeah. We could run into issues with downgrade and reupgrade. If we downgrade below 2.8 on ZK brokers, we may overwrite the topic ID on reupgrade. But the file will have the old version. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593362414



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmmm I'm not a huge fan of this either since it also seems to be waiting to set the ID and it adds complexity to recovery. I'll take another look. Maybe I'm missing something.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596231610



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we have access in ReplicaManager where this is called.
+            // could also use topicId method here potentially. This is only used in ReplicaManager (ZK code) so probably ok to set as None.
+            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints, None)
             true
         }
       }
     }
   }
 
-  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
+  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = None): Unit = {

Review comment:
       We usually try to avoid optional arguments. The problem is that they can be easily overlooked.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       Maybe I am missing something, but why is it necessary to set the topicId here? I was expecting that we would do this in `makeLeader` and `makeFollower`. I'm a tad uncomfortable exposing the `Log` object through `Partition` before we have directly associated it.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we have access in ReplicaManager where this is called.

Review comment:
       I cannot think of a reason not to pass the topic id if we have it available. Otherwise, we would need logic to set it again after the log dir is swapped.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method (non-deferring case) and the `endMetadataChangeDeferral` method.  Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible)
   
   One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in `handleMetadataRecords` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596243374



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we have access in ReplicaManager where this is called.
+            // could also use topicId method here potentially. This is only used in ReplicaManager (ZK code) so probably ok to set as None.
+            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints, None)
             true
         }
       }
     }
   }
 
-  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints): Unit = {
+  def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid] = None): Unit = {

Review comment:
       I was a little worried about this too. Some methods (not this one) are called in many locations (mostly tests) so I didn't want to bloat the changelog. But I can change these all to be explicit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597973607



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does not" +

Review comment:
       Seems like most other logs use "topic ID" so I'll go with that for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605142418



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       This is a good topic to decide on for https://github.com/apache/kafka/pull/9590




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597930758



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       There was some discussion on this topic. I think the delete request must be handled by the controller though? Let me know if this is not the case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597999385



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)

Review comment:
       seems like if the option is None, this will return true, so I'll do `topicId.isDefined && !topicId.contains(fileTopicId)`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r590548220



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
    * @return true if the request topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported so ID is consistent.
+    // This is only the case when from LeaderAndIsr Request. Raft code should never have invalid topic IDs.
+    if (receivedTopicId == null || receivedTopicId == Uuid.ZERO_UUID) {
+      if (usingRaft) false else true

Review comment:
       Maybe just `!usingRaft // only okay when not using Raft`
   
   Is this okay when using ZooKeeper with interBrokerProtocolVersion >= KAFKA_2_8_IV0?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1369,7 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              if (!partition.checkOrSetTopicId(requestTopicId, false)) {

Review comment:
       `s/false/usingRaft = false/` for clarity?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
    * @return true if the request topic id is consistent, false otherwise

Review comment:
       s/request/received/

##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =
-    interBrokerProtocolVersion >= KAFKA_2_8_IV0
+    usesSelfManagedQuorum || (requiresZookeeper && interBrokerProtocolVersion >= KAFKA_2_8_IV0)

Review comment:
       `requiresZookeeper` is always the inverse of `usesSelfManagedQuorum`, so this can simply be `usesSelfManagedQuorum || interBrokerProtocolVersion >= KAFKA_2_8_IV0`

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -251,6 +251,14 @@ class RaftReplicaManager(config: KafkaConfig,
               (Some(Partition(topicPartition, time, configRepository, this)), None)
           }
           partition.foreach { partition =>
+            builder.topicNameToId(partition.topic) match {
+              case Some(id) =>
+                if (!partition.checkOrSetTopicId(id, true))

Review comment:
       `s/true/usingRaft = true/` to make it clear what this boolean is?  Also, since this same section of code appears twice, maybe refactor it out into a helper method?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r598142963



##########
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##########
@@ -1892,8 +1892,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  // Topic IDs are used with all self-managed quorum clusters and ZK cluster with IBP greater than or equal to 2.8
   def usesTopicId: Boolean =

Review comment:
       Hmmm. In that case would we be going to an old version that doesn't support topic IDs though and this behavior is expected? I think the main reason I wanted to differentiate was so a client would know not to keep trying the request with topic IDs on an old version. But maybe this is not common enough to support this error.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596389884



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       Yes, I was thinking we would update the topicId in `createLogIfNotExists`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596246225



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       This is in the case of the log already existing without a topic ID. Before I had this, the partition.metadata file was not written on the first LeaderAndIsr request. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r592824939



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Yeah. We don't even "update" in the ZK code path, but we do wait to create the file. If we have the topic ID available when we first create the log, it would make more sense to write the file at that point. The code path in RaftReplicaManager could just be checking the log.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r592823206



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       I think the path through here makes sense when we're talking about the old zk logic in which we may or may not have an existing topicId and we rely on the controller to tell us. However, for the quorum controller, we should always have the topic ID when we create the `Log` object, so it feels strange to need to update it. That makes me think we should be looking at the paths that lead to `LogManager.getOrCreateLog`. Does it make sense to pass through topicId at that point?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r596248900



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       This is also in the case of restarting the broker.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,11 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)
 
-              if (!partition.checkOrSetTopicId(requestTopicId)) {
+              val (topicIdOpt, partitionLogOpt) = partition.topicIdAndLog
+              if (!checkTopicId(requestTopicId, topicIdOpt, partitionLogOpt)) {

Review comment:
       This is also in the case of restarting the broker, as the log is not yet associated to partition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593330038



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm. Looking into this further, it seems that the log is created in RaftReplicaManager when making a leader or a follower. This occurs in the `handleMetadataRecords` method (non-deferring case) and the `endMetadataChangeDeferral` method.  Each requires a few methods to pass through before we get to `LogManager.getOrCreateLog`, which is not ideal to pass a topic ID through (especially when some of this code is shared with the ZK path, but not impossible)
   
   One thing I am wondering though is how I tested the changes. I think I assume the log is created already, but this code suggests that the log will not be created until `makeLeaders/Followers` in the non deferring case, and not created until `endMetadataChangeDeferral` in the deferring case. This means that where I set topic IDs now, we won't have a log on the first pass! So I think maybe it is worth it to pass the ID through these methods. I think I can keep the check of the topic ID where it is though. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594552602



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -308,27 +308,29 @@ class Partition(val topicPartition: TopicPartition,
                 s"different from the requested log dir $logDir")
             false
           case None =>
-            createLogIfNotExists(isNew = false, isFutureReplica = true, highWatermarkCheckpoints)
+            // not sure if topic ID should be none here, but not sure if we have access in ReplicaManager where this is called.

Review comment:
       TODO: remove this when we decide if we want to pass in None or a topicID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605847062



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,10 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)

Review comment:
       Ah, this will simplify `hasConsistentTopicId`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10282:
URL: https://github.com/apache/kafka/pull/10282


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605144053



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId: Option[Uuid],
           val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup {

Review comment:
       I'm wondering about the ZK controller case here. Before we had Uuid.ZERO_UUID represent an unset ID, but I thought it was too easy to use thinking it was a valid ID.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605172888



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -262,6 +266,7 @@ class Log(@volatile private var _dir: File,
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
           private val hadCleanShutdown: Boolean = true,
+          @volatile var topicId: Option[Uuid],
           val keepPartitionMetadataFile: Boolean = true) extends Logging with KafkaMetricsGroup {

Review comment:
       Oh. I misunderstood. Were you referring to `keepPartitionMetadataFile`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605109950



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,13 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
+      if (log.topicId == None) {

Review comment:
       nit: use `isEmpty`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -341,9 +341,13 @@ class Partition(val topicPartition: TopicPartition,
     logManager.initializingLog(topicPartition)
     var maybeLog: Option[Log] = None
     try {
-      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica)
+      val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
       maybeLog = Some(log)
       updateHighWatermark(log)
+      // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
+      if (log.topicId == None) {
+        topicId.foreach(topicId => log.writeTopicIdToExistingLog(topicId))

Review comment:
       nit: how about `assignTopicId` instead of `writeTopicIdToExistingLog`?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {

Review comment:
       Ok, I think I get it.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,21 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && !topicId.contains(fileTopicId))
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
+              s"but log already contained topic ID $fileTopicId")
+          topicId = Some(fileTopicId)
+        }
+    } else if (topicId.isDefined && keepPartitionMetadataFile) {
+      partitionMetadataFile.write(topicId.get)

Review comment:
       nit: we can use `foreach`
   ```scala
   } else if (keepPartitionMetadataFile) {
     topicId.foreach(partitionMetadataFile.write)
   }
   ```

##########
File path: core/src/main/scala/kafka/server/RaftReplicaChangeDelegate.scala
##########
@@ -72,7 +72,8 @@ class RaftReplicaChangeDelegate(helper: RaftReplicaChangeDelegateHelper) {
   def makeLeaders(prevPartitionsAlreadyExisting: Set[MetadataPartition],
                   partitionStates: Map[Partition, MetadataPartition],
                   highWatermarkCheckpoints: OffsetCheckpoints,
-                  metadataOffset: Option[Long]): Set[Partition] = {
+                  metadataOffset: Option[Long],
+                  topicIds: String => Option[Uuid]): Set[Partition] = {

Review comment:
       I guess we could use a strong type here since topicIds are required for KIP-500, but maybe not worth it since we are delegating to `Partition` in the end.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -775,9 +776,10 @@ class LogManager(logDirs: Seq[File],
    * @param topicPartition The partition whose log needs to be returned or created
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should be returned or created
+   * @param topicId The topic ID of the topic used in the case of log creation.
    * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): Log = {

Review comment:
       It seems useful to validate here that when the `Log` already exists, its topicId does not conflict with the parameter.

##########
File path: jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
##########
@@ -145,7 +145,7 @@ public void setup() {
         OffsetCheckpoints checkpoints = (logDir, topicPartition) -> Option.apply(0L);
         for (TopicPartition topicPartition : topicPartitions) {
             final Partition partition = this.replicaManager.createPartition(topicPartition);
-            partition.createLogIfNotExists(true, false, checkpoints);
+            partition.createLogIfNotExists(true, false, checkpoints, Option.empty());

Review comment:
       Seems no harm either way, but why not set one anyway since that will be the default behavior going forward?

##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,30 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {

Review comment:
       nit: usually we write this like this:
   ```scala
   logTopicIdOpt.foreach { logTopicId => 
   ```

##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1633,10 +1633,102 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeLeader(leaderState, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints, None)
     assertEquals(4, partition.localLogOrException.highWatermark)
   }
 
+  @Test
+  def testTopicIdAndPartitionMetadataFileForLeader(): Unit = {
+    val controllerEpoch = 3
+    val leaderEpoch = 5
+    val topicId = Uuid.randomUuid()
+    val replicas = List[Integer](brokerId, brokerId + 1).asJava
+    val leaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(brokerId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(false)
+    partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId))
+
+    checkTopicId(topicId, partition)
+
+    // Create new Partition object for same topicPartition
+    val partition2 = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      isrChangeListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterIsrManager)
+
+    // partition2 should not yet be associated with the log, but should be able to get ID
+    assertTrue(partition2.topicId.isDefined)
+    assertEquals(topicId, partition2.topicId.get)
+    assertFalse(partition2.log.isDefined)
+
+    // Calling makeLeader with a new topic ID should not overwrite the old topic ID. We should get the same log.

Review comment:
       Do we have test cases for the InconsistentTopicId scenario as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605143191



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -775,9 +776,10 @@ class LogManager(logDirs: Seq[File],
    * @param topicPartition The partition whose log needs to be returned or created
    * @param isNew Whether the replica should have existed on the broker or not
    * @param isFuture True if the future log of the specified partition should be returned or created
+   * @param topicId The topic ID of the topic used in the case of log creation.
    * @throws KafkaStorageException if isNew=false, log is not found in the cache and there is offline log directory on the broker
    */
-  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false): Log = {
+  def getOrCreateLog(topicPartition: TopicPartition, isNew: Boolean = false, isFuture: Boolean = false, topicId: Option[Uuid]): Log = {

Review comment:
       If not, throw an error?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#issuecomment-801278854


   > I cannot think of a reason not to pass the topic id if we have it available. Otherwise, we would need logic to set it again after the log dir is swapped.
   
   @hachikuji this method is only called for zk controllers so it should be able to set it. but I will add the topic ID.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r594558352



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -349,11 +352,20 @@ class Log(@volatile private var _dir: File,
 
     // Delete partition metadata file if the version does not support topic IDs.
     // Recover topic ID if present and topic IDs are supported
+    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+    // write to the partition metadata file.
+    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
     if (partitionMetadataFile.exists()) {
         if (!keepPartitionMetadataFile)
           partitionMetadataFile.delete()
-        else
-          topicId = partitionMetadataFile.read().topicId
+        else {
+          val fileTopicId = partitionMetadataFile.read().topicId
+          if (topicId.isDefined && fileTopicId != topicId.get)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log, but log already contained topicId $fileTopicId")

Review comment:
       I don't know if it is possible to get to this error message. I think in most cases, the log should be grabbed if it already exists in the makeLeader/makeFollower path. In the log loading path, the topicId should be None. I thought it would be good to throw this error to know that something was wrong with the code, but maybe there is a better way. (Like maybe if topicId is defined in general)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605182627



##########
File path: core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
##########
@@ -1633,10 +1633,102 @@ class PartitionTest extends AbstractPartitionTest {
       .setZkVersion(1)
       .setReplicas(replicas)
       .setIsNew(false)
-    partition.makeLeader(leaderState, offsetCheckpoints)
+    partition.makeLeader(leaderState, offsetCheckpoints, None)
     assertEquals(4, partition.localLogOrException.highWatermark)
   }
 
+  @Test
+  def testTopicIdAndPartitionMetadataFileForLeader(): Unit = {
+    val controllerEpoch = 3
+    val leaderEpoch = 5
+    val topicId = Uuid.randomUuid()
+    val replicas = List[Integer](brokerId, brokerId + 1).asJava
+    val leaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(brokerId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(replicas)
+      .setZkVersion(1)
+      .setReplicas(replicas)
+      .setIsNew(false)
+    partition.makeLeader(leaderState, offsetCheckpoints, Some(topicId))
+
+    checkTopicId(topicId, partition)
+
+    // Create new Partition object for same topicPartition
+    val partition2 = new Partition(topicPartition,
+      replicaLagTimeMaxMs = Defaults.ReplicaLagTimeMaxMs,
+      interBrokerProtocolVersion = ApiVersion.latestVersion,
+      localBrokerId = brokerId,
+      time,
+      isrChangeListener,
+      delayedOperations,
+      metadataCache,
+      logManager,
+      alterIsrManager)
+
+    // partition2 should not yet be associated with the log, but should be able to get ID
+    assertTrue(partition2.topicId.isDefined)
+    assertEquals(topicId, partition2.topicId.get)
+    assertFalse(partition2.log.isDefined)
+
+    // Calling makeLeader with a new topic ID should not overwrite the old topic ID. We should get the same log.

Review comment:
       The inconsistentTopicId scenario is tested in ReplicaManagerTest/RaftReplicaManagerTest, since the inconsistent ID check logic is in the ReplicaManager, rather than in Partition. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm this is a little tricky. One idea I had would be to set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_**  Log. Not sure if we also want to change this for the ZK code. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r605822416



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -842,6 +842,20 @@ class LogManager(logDirs: Seq[File],
 
         log
       }
+      // When running a ZK controller, we may get a log that does not have a topic ID. Assign it here.
+      if (log.topicId.isEmpty) {
+        topicId.foreach(log.assignTopicId)
+      }
+
+      // Ensure topic IDs are consistent
+      topicId.foreach { topicId =>
+        log.topicId.foreach { logTopicId =>
+          if (topicId != logTopicId)
+            throw new IllegalStateException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +

Review comment:
       Perhaps we may as well use InconsistentTopicIdException?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1369,7 +1377,10 @@ class ReplicaManager(val config: KafkaConfig,
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIds.get(topicPartition.topic)

Review comment:
       Could we simplify this by taking advantage of `getTopicId`?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1326,6 +1326,14 @@ class ReplicaManager(val config: KafkaConfig,
             s"epoch ${leaderAndIsrRequest.controllerEpoch}")
         }
       val topicIds = leaderAndIsrRequest.topicIds()
+      def getTopicId(topicName: String): Option[Uuid] = {

Review comment:
       nit: how about `requestTopicId` instead of `getTopicId` so that it's clear that this comes from the LeaderAndIsr request?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r597931453



##########
File path: core/src/main/scala/kafka/server/RaftReplicaManager.scala
##########
@@ -374,4 +375,31 @@ class RaftReplicaManager(config: KafkaConfig,
     metadataImage.partitions.topicPartition(partition.topic, partition.partitionId).getOrElse(
       throw new IllegalStateException(s"Partition has metadata changes but does not exist in the metadata cache: ${partition.topicPartition}"))
   }
+
+  /**
+   * Checks if the topic ID received from the MetadataPartitionsBuilder is consistent with the topic ID in the log.
+   * If the log does not exist, logTopicIdOpt will be None. In this case, the ID is not inconsistent.
+   *
+   * @param receivedTopicIdOpt the topic ID received from the MetadataRecords if it exists
+   * @param logTopicIdOpt the topic ID in the log if the log exists
+   * @param topicPartition the topicPartition for the Partition being checked
+   * @throws InconsistentTopicIdException if the topic ids are not consistent
+   * @throws IllegalArgumentException if the MetadataPartitionsBuilder did not have a topic ID associated with the topic
+   */
+  private def checkTopicId(receivedTopicIdOpt: Option[Uuid], logTopicIdOpt: Option[Uuid], topicPartition: TopicPartition): Unit = {
+    receivedTopicIdOpt match {
+      case Some(receivedTopicId) =>
+        logTopicIdOpt.foreach(logTopicId => {
+          if (receivedTopicId != logTopicId) {
+            // not sure if we need both the logger and the error thrown
+            stateChangeLogger.error(s"Topic Id in memory: $logTopicId does not" +

Review comment:
       Ah I think I've been going by calling it topic ID when referring to the concept and TopicId when referring to a specific variable. But I didn't follow either case here. I'll make it more consistent.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10282: KAFKA-12426: Missing logic to create partition.metadata files in RaftReplicaManager

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10282:
URL: https://github.com/apache/kafka/pull/10282#discussion_r593351308



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -425,36 +425,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   /**
-   * Checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * Checks if the topic ID received is consistent with the topic ID in the log.
    * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
    *
-   * @param requestTopicId the topic ID from the request
-   * @return true if the request topic id is consistent, false otherwise
+   * @param receivedTopicId the topic ID from the LeaderAndIsr request or from the metadata records
+   * @return true if the received topic id is consistent, false otherwise
    */
-  def checkOrSetTopicId(requestTopicId: Uuid): Boolean = {
-    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
-    // The topic ID was not inconsistent, so return true.
-    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
-    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
-      true
-    else {
+  def checkOrSetTopicId(receivedTopicId: Uuid, usingRaft: Boolean): Boolean = {

Review comment:
       Hmm this is a little tricky. One idea I had would be to check or set the ID in the partition object only (not the file or in Log memory) then when the the partitions are passed in to `makeLeaders/Followers` and a new log is created, we will have the topic ID easily accessible to create the partition.metadata file. It kind of changes how we do things. We would either store in partition **_instead of_** or **_in addition to_**  Log. Not sure if we also want to change this for the ZK code. Thoughts?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org