You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/08/03 22:54:57 UTC

[GitHub] [kafka] jolshan opened a new pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

jolshan opened a new pull request #11171:
URL: https://github.com/apache/kafka/pull/11171


   Most of [KAFKA-13132](https://issues.apache.org/jira/browse/KAFKA-13132) has been resolved, but there is one part of one case not covered.
   From the ticket:
   `2. We only assign the topic ID when we are associating the log with the partition in replicamanager for the first time`
   
   We covered the case where the log is already existing when the leader epoch is _equal_ (ie, no updates besides the topic ID), but we don't cover the update case where the leader epoch is bumped and we already have the log associated to the partition. 
   
   This PR ensures we correctly assign topic ID in the makeLeaders/Followers path when the log already exists.
   I've also added a test for the bumped leader epoch scenario.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan removed a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan removed a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683680618



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId)) {
+        // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower

Review comment:
       nit: I think it's ok to leave this out. The point of adding the check is to reduce the coupling with ReplicaManager

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,17 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId)) {
+        // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower
+        throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
+          s"but log already contained topic ID $current")
+      }
+      // Topic ID already assigned so we can return
+      return

Review comment:
       I was thinking how we could avoid this return. How about something like this:
   ```scala
       _topicId match {
         case Some(currentId) =>
           if (!currentId.equals(topicId)) {
             throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
               s"but log already contained topic ID $current")
           }
   
         case None =>
           if (keepPartitionMetadataFile) {
             _topicId = Some(topicId)
             if (!partitionMetadataFile.exists()) {
               partitionMetadataFile.record(topicId)
               scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
             }
           }        
       }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))

Review comment:
       Can we shortcut return if the current topicId is already defined and matches the provided topicId?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))
+      // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower

Review comment:
       nit: fix alignment (just use braces 😉 . I won't tell anyone)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan edited a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan edited a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan removed a comment on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan removed a comment on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested locally and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311


   Just tested and something is still wrong here. Need to followup.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r682206623



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    isFutureReplica match {
-      case true if futureLog.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+    val logOpt = if (isFutureReplica) futureLog else log
+    if (logOpt.isEmpty) {
+      val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      if (isFutureReplica)
         this.futureLog = Option(log)
-      case false if log.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      else
         this.log = Option(log)
-      case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+    } else {
+      trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+      logOpt.foreach { log =>
+        if (log.topicId.isEmpty) {

Review comment:
       By the time we get here, I think we have already validated that the topicid is consistent. Nevertheless, I wonder if it makes sense to let `assignTopicId` validate the passed topicId? Currently it will just override the value.

##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    isFutureReplica match {
-      case true if futureLog.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+    val logOpt = if (isFutureReplica) futureLog else log

Review comment:
       Any better?
   ```scala
     def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
       def maybeCreate(logOpt: Option[Log]): Log = {
         logOpt match {
           case Some(log) =>
             trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
             topicId.foreach(log.assignTopicId)
             log
           case None =>
             createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
         }
       }
   
       if (isFutureReplica) {
         this.futureLog = Some(maybeCreate(this.futureLog))
       } else {
         this.log = Some(maybeCreate(this.log))
       }
     }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r682739288



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    isFutureReplica match {
-      case true if futureLog.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+    val logOpt = if (isFutureReplica) futureLog else log
+    if (logOpt.isEmpty) {
+      val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      if (isFutureReplica)
         this.futureLog = Option(log)
-      case false if log.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      else
         this.log = Option(log)
-      case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+    } else {
+      trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+      logOpt.foreach { log =>
+        if (log.topicId.isEmpty) {

Review comment:
       I'm a little unsure what you mean by topicId is consistent here. (There would have been no log/topic ID in log to compare to). We can add an extra check in assign topic ID to ensure there is no way to assign inconsistently.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))

Review comment:
       We can. I was just looking at this case again. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019982



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))

Review comment:
       We can. I was just looking at this case again. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893066311






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #11171:
URL: https://github.com/apache/kafka/pull/11171


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r683019148



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))

Review comment:
       Can we shortcut return if the current topicId is already defined and matches the provided topicId?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -553,6 +553,14 @@ class Log(@volatile var logStartOffset: Long,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
+    // defensively check that any newly assign topic ID matches any that is already set
+    _topicId.foreach { current =>
+      if (!current.equals(topicId))
+      // we should never get here as the topic IDs should have been checked in becomeLeaderOrFollower

Review comment:
       nit: fix alignment (just use braces 😉 . I won't tell anyone)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#issuecomment-893071380


   Ok, tested locally (with the correct jar this time) and it looked good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] jolshan commented on a change in pull request #11171: KAFKA-13132: Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2)

Posted by GitBox <gi...@apache.org>.
jolshan commented on a change in pull request #11171:
URL: https://github.com/apache/kafka/pull/11171#discussion_r682739288



##########
File path: core/src/main/scala/kafka/cluster/Partition.scala
##########
@@ -313,14 +313,20 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    isFutureReplica match {
-      case true if futureLog.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+    val logOpt = if (isFutureReplica) futureLog else log
+    if (logOpt.isEmpty) {
+      val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      if (isFutureReplica)
         this.futureLog = Option(log)
-      case false if log.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      else
         this.log = Option(log)
-      case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+    } else {
+      trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+      logOpt.foreach { log =>
+        if (log.topicId.isEmpty) {

Review comment:
       I'm a little unsure what you mean by topicId is consistent here. (There would have been no log/topic ID in log to compare to). We can add an extra check in assign topic ID to ensure there is no way to assign inconsistently.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org