You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/26 18:56:36 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #11126: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0

hachikuji commented on a change in pull request #11126:
URL: https://github.com/apache/kafka/pull/11126#discussion_r676863216



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1367,13 +1367,23 @@ class ReplicaManager(val config: KafkaConfig,
               val currentLeaderEpoch = partition.getLeaderEpoch
               val requestLeaderEpoch = partitionState.leaderEpoch
               val requestTopicId = topicIdFromRequest(topicPartition.topic)
+              val logTopicId = partition.topicId
+
+              // We propagate the partition state down if:
+              // 1. The leader epoch is higher than the current leader epoch of the partition
+              // 2. The leader epoch is same as the current leader epoch but a new topic id is being assigned. This is
+              //    needed to handle the case where a topic id is assigned for the first time after upgrade.
+              def propagatePartitionState(requestLeaderEpoch: Int, currentLeaderEpoch: Int): Boolean = {
+                requestLeaderEpoch > currentLeaderEpoch ||
+                  (requestLeaderEpoch == currentLeaderEpoch && logTopicId.isEmpty && requestTopicId.isDefined)
+              }
 
-              if (!hasConsistentTopicId(requestTopicId, partition.topicId)) {
-                stateChangeLogger.error(s"Topic ID in memory: ${partition.topicId.get} does not" +
+              if (!hasConsistentTopicId(requestTopicId, logTopicId)) {
+                stateChangeLogger.error(s"Topic ID in memory: ${logTopicId.get} does not" +
                   s" match the topic ID for partition $topicPartition received: " +
                   s"${requestTopicId.get}.")
                 responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
-              } else if (requestLeaderEpoch > currentLeaderEpoch) {
+              } else if (propagatePartitionState(requestLeaderEpoch, currentLeaderEpoch)) {

Review comment:
       I'm a bit concerned about the transition through `Partition.makeLeader` when there is no epoch bump. For example, the logic to update the epoch cache assumes that the epoch has indeed been bumped. If we end up overwriting the start offset of the current epoch, that could affect the truncation logic.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org