You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/08/06 17:06:56 UTC

[kafka] branch 3.0 updated: KAFKA-13132; Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2) (#11171)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 93c68fe  KAFKA-13132; Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2) (#11171)
93c68fe is described below

commit 93c68fe9c57ed8e0d619277b7a78116b11814c50
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Thu Aug 5 18:10:48 2021 -0700

    KAFKA-13132; Upgrading to topic IDs in LISR requests has gaps introduced in 3.0 (part 2) (#11171)
    
    Most of [KAFKA-13132](https://issues.apache.org/jira/browse/KAFKA-13132) has been resolved, but there is one part of one case not covered.
    From the ticket:
    `2. We only assign the topic ID when we are associating the log with the partition in replicamanager for the first time`
    
    We covered the case where the log is already existing when the leader epoch is _equal_ (ie, no updates besides the topic ID), but we don't cover the update case where the leader epoch is bumped and we already have the log associated to the partition.
    
    This PR ensures we correctly assign topic ID in the makeLeaders/Followers path when the log already exists.
    I've also added a test for the bumped leader epoch scenario.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala  | 24 ++++++++++-----
 core/src/main/scala/kafka/log/Log.scala            | 21 +++++++++----
 .../unit/kafka/server/ReplicaManagerTest.scala     | 36 +++++++++++++++++-----
 3 files changed, 59 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index e5f0a91..506e665 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -313,14 +313,22 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
-    isFutureReplica match {
-      case true if futureLog.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
-        this.futureLog = Option(log)
-      case false if log.isEmpty =>
-        val log = createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
-        this.log = Option(log)
-      case _ => trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+    def maybeCreate(logOpt: Option[Log]): Log = {
+      logOpt match {
+        case Some(log) =>
+          trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
+          if (log.topicId.isEmpty)
+            topicId.foreach(log.assignTopicId)
+          log
+        case None =>
+          createLog(isNew, isFutureReplica, offsetCheckpoints, topicId)
+      }
+    }
+
+    if (isFutureReplica) {
+      this.futureLog = Some(maybeCreate(this.futureLog))
+    } else {
+      this.log = Some(maybeCreate(this.log))
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 350200b..eca4be8 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -574,12 +574,21 @@ class Log(@volatile private var _dir: File,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
-    if (keepPartitionMetadataFile) {
-      _topicId = Some(topicId)
-      if (!partitionMetadataFile.exists()) {
-        partitionMetadataFile.record(topicId)
-        scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
-      }
+    _topicId match {
+      case Some(currentId) =>
+        if (!currentId.equals(topicId)) {
+          throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
+            s"but log already contained topic ID $currentId")
+        }
+
+      case None =>
+        if (keepPartitionMetadataFile) {
+          _topicId = Some(topicId)
+          if (!partitionMetadataFile.exists()) {
+            partitionMetadataFile.record(topicId)
+            scheduler.schedule("flush-metadata-file", maybeFlushMetadataFile)
+          }
+        }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index f92b2d2..cab00c0 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -1227,6 +1227,7 @@ class ReplicaManagerTest {
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
     val partition0Replicas = Seq[Integer](0, 1).asJava
+    val topicId = Uuid.randomUuid()
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
       Seq(new LeaderAndIsrPartitionState()
@@ -1239,7 +1240,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
+      Collections.singletonMap(tp0.topic, topicId),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1260,7 +1261,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
+      Collections.singletonMap(tp0.topic, topicId),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
 
@@ -1277,6 +1278,7 @@ class ReplicaManagerTest {
     val offsetCheckpoints = new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints)
     replicaManager.createPartition(tp0).createLogIfNotExists(isNew = false, isFutureReplica = false, offsetCheckpoints, None)
     val partition0Replicas = Seq[Integer](0, 1).asJava
+    val topicId = Uuid.randomUuid()
 
     val becomeLeaderRequest = new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, brokerEpoch,
       Seq(new LeaderAndIsrPartitionState()
@@ -1289,7 +1291,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
+      Collections.singletonMap(tp0.topic, topicId),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(1, becomeLeaderRequest, (_, _) => ())
 
@@ -1311,7 +1313,7 @@ class ReplicaManagerTest {
         .setZkVersion(0)
         .setReplicas(partition0Replicas)
         .setIsNew(true)).asJava,
-      Collections.singletonMap(tp0.topic, Uuid.randomUuid()),
+      Collections.singletonMap(tp0.topic, topicId),
       Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
 
@@ -2358,19 +2360,20 @@ class ReplicaManagerTest {
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new MockTimer(time))
     try {
       val brokerList = Seq[Integer](0, 1).asJava
-      val topicPartition = new TopicPartition(topic, 0)
       val topicId = Uuid.randomUuid()
       val topicIds = Collections.singletonMap(topic, topicId)
       val topicNames = Collections.singletonMap(topicId, topic)
+      val topicPartition = new TopicPartition(topic, 0)
+      val topicPartition2 = new TopicPartition(topic, 1)
 
-      def leaderAndIsrRequest(topicIds: util.Map[String, Uuid], version: Short): LeaderAndIsrRequest =
+      def leaderAndIsrRequest(topicIds: util.Map[String, Uuid], version: Short, partition: Int = 0, leaderEpoch: Int = 0): LeaderAndIsrRequest =
         new LeaderAndIsrRequest.Builder(version, 0, 0, brokerEpoch,
         Seq(new LeaderAndIsrPartitionState()
           .setTopicName(topic)
-          .setPartitionIndex(0)
+          .setPartitionIndex(partition)
           .setControllerEpoch(0)
           .setLeader(0)
-          .setLeaderEpoch(0)
+          .setLeaderEpoch(leaderEpoch)
           .setIsr(brokerList)
           .setZkVersion(0)
           .setReplicas(brokerList)
@@ -2393,7 +2396,24 @@ class ReplicaManagerTest {
       assertTrue(log.topicId.isDefined)
       assertEquals(topicId, log.topicId.get)
 
+      // Repeat with partition 2, but in this case, update the leader epoch
+      // Send a request without a topic ID so that we have a log without a topic ID associated to the partition.
+      val response3 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(Collections.emptyMap(), 4, 1), (_, _) => ())
+      assertEquals(Errors.NONE, response3.partitionErrors(Collections.emptyMap()).get(topicPartition2))
+      assertTrue(replicaManager.localLog(topicPartition2).isDefined)
+      val log2 = replicaManager.localLog(topicPartition2).get
+      assertFalse(log2.partitionMetadataFile.exists())
+      assertTrue(log2.topicId.isEmpty)
+
+      val response4 = replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(topicIds, ApiKeys.LEADER_AND_ISR.latestVersion, 1, 1), (_, _) => ())
+      assertEquals(Errors.NONE, response4.partitionErrors(topicNames).get(topicPartition2))
+      assertTrue(replicaManager.localLog(topicPartition2).isDefined)
+      assertTrue(log2.partitionMetadataFile.exists())
+      assertTrue(log2.topicId.isDefined)
+      assertEquals(topicId, log2.topicId.get)
+
       assertEquals(topicId, log.partitionMetadataFile.read().topicId)
+      assertEquals(topicId, log2.partitionMetadataFile.read().topicId)
     } finally replicaManager.shutdown(checkpointHW = false)
   }