You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2021/06/18 07:26:28 UTC

[kafka] branch trunk updated: KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update) (#10754)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 195a8b0  KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update) (#10754)
195a8b0 is described below

commit 195a8b0ed153a711d88e51b9dff66d27cc451776
Author: Justine Olshan <jo...@confluent.io>
AuthorDate: Fri Jun 18 00:25:03 2021 -0700

    KAFKA-12835: Topic IDs can mismatch on brokers (after interbroker protocol version update) (#10754)
    
    Upon upgrading to IBP 2.8, topic ID can end up getting reassigned which can cause errors in LeaderAndIsr handling when the partition metadata files from the previous ID are still on the broker.
    
    Topic IDs are stored in the TopicZNode. The behavior of the code before this fix is as follows:
    Consider we had a controller with IBP 2.8+. Each topic will be assigned topic IDs and LeaderAndIsr requests will write partition.metadata files to the brokers. If we re-elect the controller and end up with a controller with an older IBP version and we reassign partitions, the TopicZNode is overwritten and we lose the topic ID. Upon electing a 2.8+ IBP controller, we will see the TopicZNode is missing a topic ID and will generate a new one. If the broker still has the old partition met [...]
    
    This patch changes controller logic so that we maintain the topic ID in the controller and the ZNode even when IBP < 2.8. This means that in the scenario above, reassigning partitions will not result in losing the topic ID and reassignment.
    
    Topic IDs may be lost when downgrading the code below version 2.8, but upon re-upgrading to code version 2.8+, before bumping the IBP, all partition metadata files will be deleted to prevent any errors.
    
    Reviewers:  Lucas Bradstreet <lu...@confluent.io>, David Jacot <dj...@confluent.io>
---
 .../scala/kafka/controller/KafkaController.scala   | 20 ++++--
 core/src/main/scala/kafka/log/Log.scala            | 58 ++++++++++------
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  2 +-
 .../controller/ControllerIntegrationTest.scala     | 78 ++++++++++++++++++++++
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 25 ++++++-
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  5 +-
 6 files changed, 158 insertions(+), 30 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 654649b..5b463a6 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1658,10 +1658,22 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def processTopicIds(topicIdAssignments: Set[TopicIdReplicaAssignment]): Unit = {
-    if (config.usesTopicId) {
-      val updated = zkClient.setTopicIds(topicIdAssignments.filter(_.topicId.isEmpty), controllerContext.epochZkVersion)
-      val allTopicIdAssignments = updated ++ topicIdAssignments.filter(_.topicId.isDefined)
-      allTopicIdAssignments.foreach(topicIdAssignment => controllerContext.addTopicId(topicIdAssignment.topic, topicIdAssignment.topicId.get))
+    // Create topic IDs for topics missing them if we are using topic IDs
+    // Otherwise, maintain what we have in the topicZNode
+    val updatedTopicIdAssignments = if (config.usesTopicId) {
+      val (withTopicIds, withoutTopicIds) = topicIdAssignments.partition(_.topicId.isDefined)
+      withTopicIds ++ zkClient.setTopicIds(withoutTopicIds, controllerContext.epochZkVersion)
+    } else {
+      topicIdAssignments
+    }
+
+    // Add topic IDs to controller context
+    // If we don't have IBP 2.8, but are running 2.8 code, put any topic IDs from the ZNode in controller context
+    // This is to avoid losing topic IDs during operations like partition reassignments while the cluster is in a mixed state
+    updatedTopicIdAssignments.foreach { topicIdAssignment =>
+      topicIdAssignment.topicId.foreach { topicId =>
+        controllerContext.addTopicId(topicIdAssignment.topic, topicId)
+    }
     }
   }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a253c17..35ab50c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -243,12 +243,12 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
  *                         with the provided logStartOffset and nextOffsetMetadata
  * @param producerStateManager The ProducerStateManager instance containing state associated with the provided segments
  * @param logDirFailureChannel The LogDirFailureChannel instance to asynchronously handle log directory failure
- * @param topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
+ * @param _topicId optional Uuid to specify the topic ID for the topic if it exists. Should only be specified when
  *                first creating the log through Partition.makeLeader or Partition.makeFollower. When reloading a log,
  *                this field will be populated by reading the topic ID value from partition.metadata if it exists.
  * @param keepPartitionMetadataFile boolean flag to indicate whether the partition.metadata file should be kept in the
  *                                  log directory. A partition.metadata file is only created when the raft controller is used
- *                                  or the ZK controller's inter-broker protocol version is at least 2.8.
+ *                                  or the ZK controller and this broker's inter-broker protocol version is at least 2.8.
  *                                  This file will persist the topic ID on the broker. If inter-broker protocol for a ZK controller
  *                                  is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
  *                                  If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
@@ -315,27 +315,41 @@ class Log(@volatile private var _dir: File,
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
     maybeIncrementFirstUnstableOffset()
-    // Delete partition metadata file if the version does not support topic IDs.
-    // Recover topic ID if present and topic IDs are supported
-    // If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
-    // write to the partition metadata file.
-    // Ensure we do not try to assign a provided topicId that is inconsistent with the ID on file.
+    initializeTopicId()
+  }
+
+  /**
+   * Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId.
+   * Delete partition metadata file if the version does not support topic IDs.
+   * Set _topicId based on a few scenarios:
+   *   - Recover topic ID if present and topic IDs are supported. Ensure we do not try to assign a provided topicId that is inconsistent
+   *     with the ID on file.
+   *   - If we were provided a topic ID when creating the log, partition metadata files are supported, and one does not yet exist
+   *     set _topicId and write to the partition metadata file.
+   *   - Otherwise set _topicId to None
+   */
+  def initializeTopicId(): Unit =  {
     if (partitionMetadataFile.exists()) {
-        if (keepPartitionMetadataFile) {
-          val fileTopicId = partitionMetadataFile.read().topicId
-          if (_topicId.isDefined && !_topicId.contains(fileTopicId))
-            throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
-              s"but log already contained topic ID $fileTopicId")
-          _topicId = Some(fileTopicId)
-        } else {
-          try partitionMetadataFile.delete()
-          catch {
-            case e: IOException =>
-              error(s"Error while trying to delete partition metadata file ${partitionMetadataFile}", e)
-          }
+      if (keepPartitionMetadataFile) {
+        val fileTopicId = partitionMetadataFile.read().topicId
+        if (_topicId.isDefined && !_topicId.contains(fileTopicId))
+          throw new InconsistentTopicIdException(s"Tried to assign topic ID $topicId to log for topic partition $topicPartition," +
+            s"but log already contained topic ID $fileTopicId")
+
+        _topicId = Some(fileTopicId)
+
+      } else {
+        try partitionMetadataFile.delete()
+        catch {
+          case e: IOException =>
+            error(s"Error while trying to delete partition metadata file ${partitionMetadataFile}", e)
         }
+      }
     } else if (keepPartitionMetadataFile) {
       _topicId.foreach(partitionMetadataFile.write)
+    } else {
+      // We want to keep the file and the in-memory topic ID in sync.
+      _topicId = None
     }
   }
 
@@ -555,8 +569,10 @@ class Log(@volatile private var _dir: File,
 
   /** Only used for ZK clusters when we update and start using topic IDs on existing topics */
   def assignTopicId(topicId: Uuid): Unit = {
-    partitionMetadataFile.write(topicId)
-    _topicId = Some(topicId)
+    if (keepPartitionMetadataFile) {
+      partitionMetadataFile.write(topicId)
+      _topicId = Some(topicId)
+    }
   }
 
   private def initializeLeaderEpochCache(): Unit = lock synchronized {
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 87b8917..2a9e7a1 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -410,7 +410,7 @@ object KafkaMetadataLog {
       logDirFailureChannel = new LogDirFailureChannel(5),
       lastShutdownClean = false,
       topicId = Some(topicId),
-      keepPartitionMetadataFile = false
+      keepPartitionMetadataFile = true
     )
 
     val metadataLog = new KafkaMetadataLog(
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index f34acd9..1ff8c6f 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -1120,6 +1120,84 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
       "topic should have been removed from controller context after deletion")
   }
 
+  @Test
+  def testTopicIdUpgradeAfterReassigningPartitions(): Unit = {
+    val tp = new TopicPartition("t", 0)
+    val reassignment = Map(tp -> Some(Seq(0)))
+    val adminZkClient = new AdminZkClient(zkClient)
+
+    // start server with old IBP
+    servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+    // use create topic with ZK client directly, without topic ID
+    adminZkClient.createTopic(tp.topic, 1, 1)
+    waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+    val topicIdAfterCreate = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    val id = servers.head.kafkaController.controllerContext.topicIds.get(tp.topic)
+    assertTrue(topicIdAfterCreate.isEmpty)
+    assertEquals(topicIdAfterCreate, id,
+      "expected no topic ID, but one existed")
+
+    // Upgrade to IBP 2.8
+    servers(0).shutdown()
+    servers(0).awaitShutdown()
+    servers = makeServers(1)
+    waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon controller restart")
+    val topicIdAfterUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
+      "expected same topic ID but it can not be found")
+    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+      "correct topic name expected but cannot be found in the controller context")
+
+    // Downgrade back to 2.7
+    servers(0).shutdown()
+    servers(0).awaitShutdown()
+    servers = makeServers(1, interBrokerProtocolVersion = Some(KAFKA_2_7_IV0))
+    waitForPartitionState(tp, firstControllerEpoch, 0, LeaderAndIsr.initialLeaderEpoch,
+      "failed to get expected partition state upon topic creation")
+    val topicIdAfterDowngrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    assertTrue(topicIdAfterDowngrade.isDefined)
+    assertEquals(topicIdAfterUpgrade, topicIdAfterDowngrade,
+      "expected same topic ID but it can not be found after downgrade")
+    assertEquals(topicIdAfterDowngrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
+      "expected same topic ID in controller context but it is no longer found after downgrade")
+    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+      "correct topic name expected but cannot be found in the controller context")
+
+    // Reassign partitions
+    servers(0).kafkaController.eventManager.put(ApiPartitionReassignment(reassignment, _ => ()))
+    waitForPartitionState(tp, 3, 0, 1,
+      "failed to get expected partition state upon controller restart")
+    val topicIdAfterReassignment = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    assertTrue(topicIdAfterReassignment.isDefined)
+    assertEquals(topicIdAfterUpgrade, topicIdAfterReassignment,
+      "expected same topic ID but it can not be found after reassignment")
+    assertEquals(topicIdAfterUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
+      "expected same topic ID in controller context but is no longer found after reassignment")
+    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterUpgrade.get),
+      "correct topic name expected but cannot be found in the controller context")
+
+    // Upgrade back to 2.8
+    servers(0).shutdown()
+    servers(0).awaitShutdown()
+    servers = makeServers(1)
+    waitForPartitionState(tp, 3, 0, 1,
+      "failed to get expected partition state upon controller restart")
+    val topicIdAfterReUpgrade = zkClient.getTopicIdsForTopics(Set(tp.topic())).get(tp.topic())
+    assertEquals(topicIdAfterUpgrade, topicIdAfterReUpgrade,
+      "expected same topic ID but it can not be found after re-upgrade")
+    assertEquals(topicIdAfterReUpgrade, servers.head.kafkaController.controllerContext.topicIds.get(tp.topic),
+      "topic ID can not be found in controller context after re-upgrading IBP")
+    assertEquals(tp.topic(), servers.head.kafkaController.controllerContext.topicNames(topicIdAfterReUpgrade.get),
+      "correct topic name expected but cannot be found in the controller context")
+
+    adminZkClient.deleteTopic(tp.topic)
+    TestUtils.waitUntilTrue(() => servers.head.kafkaController.controllerContext.topicIds.get(tp.topic).isEmpty,
+      "topic ID for topic should have been removed from controller context after deletion")
+    assertTrue(servers.head.kafkaController.controllerContext.topicNames.get(topicIdAfterUpgrade.get).isEmpty)
+  }
+
   private def testControllerMove(fun: () => Unit): Unit = {
     val controller = getController().kafkaController
     val appender = LogCaptureAppender.createAndRegister()
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 0113db1..4a0674d 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1964,6 +1964,26 @@ class LogTest {
   }
 
   @Test
+  def testNoOpWhenKeepPartitionMetadataFileIsFalse(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig()
+    val log = createLog(logDir, logConfig, keepPartitionMetadataFile = false)
+
+    val topicId = Uuid.randomUuid()
+    log.assignTopicId(topicId)
+    // We should not write to this file or set the topic ID
+    assertFalse(log.partitionMetadataFile.exists())
+    assertEquals(None, log.topicId)
+    log.close()
+
+    val log2 = createLog(logDir, logConfig, topicId = Some(Uuid.randomUuid()),  keepPartitionMetadataFile = false)
+
+    // We should not write to this file or set the topic ID
+    assertFalse(log2.partitionMetadataFile.exists())
+    assertEquals(None, log2.topicId)
+    log2.close()
+  }
+
+  @Test
   def testLogFailsWhenInconsistentTopicIdSet(): Unit = {
     val logConfig = LogTestUtils.createLogConfig()
     var log = createLog(logDir, logConfig)
@@ -3476,9 +3496,10 @@ class LogTest {
                         maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
                         producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true,
-                        topicId: Option[Uuid] = None): Log = {
+                        topicId: Option[Uuid] = None,
+                        keepPartitionMetadataFile: Boolean = true): Log = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
-      maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, lastShutdownClean, topicId = topicId)
+      maxProducerIdExpirationMs, producerIdExpirationCheckIntervalMs, lastShutdownClean, topicId = topicId, keepPartitionMetadataFile = keepPartitionMetadataFile)
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (Log, LogSegment) = {
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 0169cd4..5af0f4d 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -84,7 +84,8 @@ object LogTestUtils {
                 maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
                 producerIdExpirationCheckIntervalMs: Int = LogManager.ProducerIdExpirationCheckIntervalMs,
                 lastShutdownClean: Boolean = true,
-                topicId: Option[Uuid] = None): Log = {
+                topicId: Option[Uuid] = None,
+                keepPartitionMetadataFile: Boolean = true): Log = {
     Log(dir = dir,
       config = config,
       logStartOffset = logStartOffset,
@@ -97,7 +98,7 @@ object LogTestUtils {
       logDirFailureChannel = new LogDirFailureChannel(10),
       lastShutdownClean = lastShutdownClean,
       topicId = topicId,
-      keepPartitionMetadataFile = true)
+      keepPartitionMetadataFile = keepPartitionMetadataFile)
   }
 
   /**