You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/17 20:55:17 UTC

[GitHub] [kafka] jolshan opened a new pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

jolshan opened a new pull request #10143:
URL: https://github.com/apache/kafka/pull/10143


   Changes how invalid IDs are handled in LeaderAndIsr requests. The ID check now occurs before leader epoch. If the ID exists and is invalid, the partition is ignored and an UNKNOWN_TOPIC_ID error is returned in the response.
   
   This error should be rare, but if it occurs, it signals the need for manual intervention.
   
   Added tests for this behavior.
   I also plan to rerun the benchmark from https://github.com/apache/kafka/pull/10071 to ensure there are no regressions with this change.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10143:
URL: https://github.com/apache/kafka/pull/10143


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that failed:
   ```
   ConsumerBounceTest.testClose()
   ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   ```
   
   These are tests I've found to be flaky running locally.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578007354



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       Oh hmmm I see how this could be used on the path for actually setting it. I'll think of a good name
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577978644



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       It's a minor thing, but we can avoid this nesting by restructuring the checks a little bit. For example, it would be a good idea to have a helper in `Partition` which encapsulates the update of the topicId state in `Log`. Maybe something like this:
   
   ```scala
   class Partition {
     // Update topicid if necessary. 
     // Return false if the update failed because the topicId is inconsistent
     def maybeUpdateTopicId(topicId: Uuid): Boolean
   }
   
   // in ReplicaManager
   if (!partition.maybeUpdateTopicId(requestTopicId)) {
     error(...)
     responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
   } else if (requestLeaderEpoch > currentLeaderEpoch) {
   ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that failed:
   ```
   ConsumerBounceTest.testClose()
   ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
   ```
   
   These are tests I've found to be flaky running locally in the past.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577978644



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       It's a minor thing, but we can avoid this nesting by restructuring the checks a little bit. For example, it would be a good idea to have a helper in `Partition` which encapsulate the update of the topicId state in `Log`. Maybe something like this:
   
   ```scala
   class Partition {
     // Update topicid if necessary. 
     // Return false if the update failed because the topicId is inconsistent
     def maybeUpdateTopicId(topicId: Uuid): Boolean
   }
   
   // in ReplicaManager
   if (!partition.maybeUpdateTopicId(requestTopicId)) {
     error(...)
     responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
   } else if (requestLeaderEpoch > currentLeaderEpoch) {
   ...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781534358


   @hachikuji I was wondering if I should do that. I think I've been having issues with mailer daemon trying to send to DISCUSS so I'll send to VOTE this time.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#issuecomment-781732199


   Ran tests locally with newest code from trunk. Here are the tests that failed:
   ConsumerBounceTest. testClose()
   ConnectionQuotasTest. testListenerConnectionRateLimitWhenActualRateAboveLimit()
   DynamicConnectionQuotaTest. testDynamicListenerConnectionCreationRateQuota()
   
   These are tests I've found to be flaky running locally.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578010946



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       In this case, if log is None, does it make sense to error here? If it is None, we are unable to check the topic ID




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577986110



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)

Review comment:
       That makes sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578575340



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,11 +1364,19 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
+              val requestTopicId = topicIds.get(topicPartition.topic)
+              val (consistentTopicId, logTopicId) = partition.checkOrSetTopicId(requestTopicId)

Review comment:
       The return type here is a little awkward. It looks like we only need `logTopicId` for the log message below. Since `Partition` also has access to the state change logger, how about we move it into `checkOrSetTopicId`?

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,35 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * This method checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, but the log has no ID set, set the log ID to be the request ID.
+   * Returns a boolean representing whether the topic ID was consistent and the final log ID if it exists.
+   */
+  def checkOrSetTopicId(requestTopicId: Uuid): (Boolean, Option[Uuid]) = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
+    // The topic ID was not inconsistent, so return true.
+    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+      (true, None)
+    else if (log.isEmpty)

Review comment:
       nit: whenever you see an `isEmpty` or `isDefined` followed by a `get`, it's a good sign that we could probably simplify with a `match`. For example:
   ```scala
   else {
    log match {
      case None => (true, None)
      case Some(log) => 
   ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578638672



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,43 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * This method checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
+   * Returns a boolean representing whether the topic ID was consistent.

Review comment:
       nit: turn this into scaladoc and 
   ```scala
   * @return true if the request topic id is consistent, false otherwise
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577985597



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       I was struggling with this since I didn't think it was the best solution. I'll take a look at the version you have here. Would we want to ever update the topic ID though? Maybe just a method to check if they are equal.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r577972328



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +

Review comment:
       I think we can even call this an error. The current state machine in the controller should prevent this from happening. When it does not, it is a bug.
   
   Also, can we mention the partition name in this log message? 
   
   Finally, one nit: it is not necessary to call `toString` explicitly in log statements. It is called implicitly. So we can replace `${id.toString}` with `$id` for example.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())

Review comment:
       nit: unnecessary parenthesis after `topic`. Also, can we name this explicitly as `topicId`. Perhaps even `requestTopicId` to emphasize that this is the one from the request.

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
+                    invalidId = true
+                  }
+                }
+              }
+
+              // If we found an invalid ID, we don't need to check the leader epoch
+              if (!invalidId) {
+                if (requestLeaderEpoch > currentLeaderEpoch) {

Review comment:
       It's a minor thing, but we can avoid this nesting by restructuring the checks a little bit. For example, it would be a good idea to have a helper in `Partition` which encapsulate the update of the topicId state. Maybe something like this:
   
   ```scala
   class Partition {
     // Update topicid if necessary. 
     // Return false if the update failed because the topicId is inconsistent
     def maybeUpdateTopicId(topicId: Uuid): Boolean
   }
   
   // in ReplicaManager
   if (!partition.maybeUpdateTopicId(requestTopicId)) {
     error(...)
     responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)
   } else if (requestLeaderEpoch > currentLeaderEpoch) {
   ...

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,34 +1364,59 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
-                // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
-                // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
-                if (partitionState.replicas.contains(localBrokerId))
-                  partitionStates.put(partition, partitionState)
-                else {
-                  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
-                    s"correlation id $correlationId epoch $controllerEpoch for partition $topicPartition as itself is not " +
-                    s"in assigned replica list ${partitionState.replicas.asScala.mkString(",")}")
-                  responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION)
+              val id = topicIds.get(topicPartition.topic())
+              var invalidId = false
+
+              // Ensure we have not received a request from an older protocol
+              if (id != null && id != Uuid.ZERO_UUID) {
+                partition.log.foreach { log =>
+                  // Check if topic ID is in memory, if not, it must be new to the broker and does not have a metadata file.
+                  // This is because if the broker previously wrote it to file, it would be recovered on restart after failure.
+                  if (log.topicId == Uuid.ZERO_UUID) {
+                    log.partitionMetadataFile.write(id)
+                    log.topicId = id
+                    // Warn if the topic ID in the request does not match the log.
+                  } else if (log.topicId != id) {
+                    stateChangeLogger.warn(s"Topic Id in memory: ${log.topicId.toString} does not" +
+                      s" match the topic Id provided in the request: " +
+                      s"${id.toString}.")
+                    responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_ID)

Review comment:
       I am a little doubtful about reusing this error code since this case does not quite match the one it was intended for. I wonder if it would be worth having an explicit `INCONSISTENT_TOPIC_ID` error code?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578638672



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,43 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * This method checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, and the log exists but has no ID set, set the log ID to be the request ID.
+   * Returns a boolean representing whether the topic ID was consistent.

Review comment:
       nit: turn this into scaladoc and emphasize what the returned value indicates
   ```scala
   * @return true if the request topic id is consistent, false otherwise
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578577789



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1364,11 +1364,19 @@ class ReplicaManager(val config: KafkaConfig,
                 Some(partition)
             }
 
-            // Next check partition's leader epoch
+            // Next check the topic ID and the partition's leader epoch
             partitionOpt.foreach { partition =>
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
-              if (requestLeaderEpoch > currentLeaderEpoch) {
+              val requestTopicId = topicIds.get(topicPartition.topic)
+              val (consistentTopicId, logTopicId) = partition.checkOrSetTopicId(requestTopicId)

Review comment:
       Good point. I didn't realize it had access.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #10143: KAFKA-12332: Error partitions from topics with invalid IDs in LISR requests

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #10143:
URL: https://github.com/apache/kafka/pull/10143#discussion_r578577351



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -428,6 +428,35 @@ class Partition(val topicPartition: TopicPartition,
       this.log = Some(log)
   }
 
+  /**
+   * This method checks if the topic ID provided in the request is consistent with the topic ID in the log.
+   * If a valid topic ID is provided, but the log has no ID set, set the log ID to be the request ID.
+   * Returns a boolean representing whether the topic ID was consistent and the final log ID if it exists.
+   */
+  def checkOrSetTopicId(requestTopicId: Uuid): (Boolean, Option[Uuid]) = {
+    // If the request had an invalid topic ID, then we assume that topic IDs are not supported.
+    // The topic ID was not inconsistent, so return true.
+    // If the log is empty, then we can not say that topic ID is inconsistent, so return true.
+    if (requestTopicId == null || requestTopicId == Uuid.ZERO_UUID)
+      (true, None)
+    else if (log.isEmpty)

Review comment:
       I had that in the back of my head but couldn't quite remember. Thanks for reminding. :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org