You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2024/01/16 23:57:33 UTC

(kafka) branch 3.6 updated: KAFKA-16120: Fix partition reassignment during ZK migration

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 7486223d9e3 KAFKA-16120: Fix partition reassignment during ZK migration
7486223d9e3 is described below

commit 7486223d9e39a48700bfb6dfdad93e06e4ab550f
Author: David Mao <dm...@confluent.io>
AuthorDate: Fri Jan 12 09:12:33 2024 -0800

    KAFKA-16120: Fix partition reassignment during ZK migration
    
    When we are migrating from ZK mode to KRaft mode, the brokers pass through a phase where they are
    running in ZK mode, but the controller is in KRaft mode (aka a kcontroller). This is called "hybrid
    mode." In hybrid mode, the KRaft controllers send old-style controller RPCs to the remaining ZK
    mode brokers. (StopReplicaRequest, LeaderAndIsrRequest, UpdateMetadataRequest, etc.)
    
    To complete partition reassignment, the kcontroller must send a StopReplicaRequest to any brokers
    that no longer host the partition in question. Previously, it was sending this StopReplicaRequest
    with delete = false. This led to stray partitions, because the partition data was never removed as
    it should have been. This PR fixes it to set delete = true. This fixes KAFKA-16120.
    
    There is one additional problem with partition reassignment in hybrid mode, tracked as KAFKA-16121.
    The issue is that in ZK mode, brokers ignore any LeaderAndIsr request where the partition leader
    epoch is less than or equal to the current partition leader epoch. However, when in hybrid mode,
    just as in KRaft mode, we do not bump the leader epoch when starting a new reassignment, see:
    `triggerLeaderEpochBumpIfNeeded`. This PR resolves this problem by adding a special case on the
    broker side when isKRaftController = true.
    
    Reviewers: Akhilesh Chaganti <ak...@users.noreply.github.com>, Colin P. McCabe <cm...@apache.org>
---
 .../kafka/migration/MigrationPropagator.scala      |  2 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  2 +-
 .../kafka/zk/ZkMigrationIntegrationTest.scala      | 84 +++++++++++++++++++++-
 3 files changed, 85 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/migration/MigrationPropagator.scala b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
index 2a02f5891ec..7bf0fc3ff56 100644
--- a/core/src/main/scala/kafka/migration/MigrationPropagator.scala
+++ b/core/src/main/scala/kafka/migration/MigrationPropagator.scala
@@ -207,7 +207,7 @@ class MigrationPropagator(
         val newReplicas = partitionRegistration.replicas.toSet
         val removedReplicas = oldReplicas -- newReplicas
         if (removedReplicas.nonEmpty) {
-          requestBatch.addStopReplicaRequestForBrokers(removedReplicas.toSeq, tp, deletePartition = false)
+          requestBatch.addStopReplicaRequestForBrokers(removedReplicas.toSeq, tp, deletePartition = true)
         }
       }
     }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 469321e14d5..63a6f049247 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1818,7 +1818,7 @@ class ReplicaManager(val config: KafkaConfig,
                   s" match the topic ID for partition $topicPartition received: " +
                   s"${requestTopicId.get}.")
                 responseMap.put(topicPartition, Errors.INCONSISTENT_TOPIC_ID)
-              } else if (requestLeaderEpoch > currentLeaderEpoch) {
+              } else if (requestLeaderEpoch > currentLeaderEpoch || (requestLeaderEpoch == currentLeaderEpoch && leaderAndIsrRequest.isKRaftController)) {
                 // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
                 // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
                 if (partitionState.replicas.contains(localBrokerId)) {
diff --git a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
index ec28484172a..4c3ddd8b80d 100644
--- a/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/zk/ZkMigrationIntegrationTest.scala
@@ -53,7 +53,7 @@ import org.slf4j.LoggerFactory
 
 import java.util
 import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit}
-import java.util.{Properties, UUID}
+import java.util.{Collections, Optional, Properties, UUID}
 import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
@@ -678,6 +678,88 @@ class ZkMigrationIntegrationTest {
     }
   }
 
+  @ClusterTest(clusterType = Type.ZK, brokers = 4, metadataVersion = MetadataVersion.IBP_3_6_IV2, serverProperties = Array(
+    new ClusterConfigProperty(key = "inter.broker.listener.name", value = "EXTERNAL"),
+    new ClusterConfigProperty(key = "listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "advertised.listeners", value = "PLAINTEXT://localhost:0,EXTERNAL://localhost:0"),
+    new ClusterConfigProperty(key = "listener.security.protocol.map", value = "EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"),
+  ))
+  def testPartitionReassignmentInHybridMode(zkCluster: ClusterInstance): Unit = {
+    // Create a topic in ZK mode
+    val topicName = "test"
+    var admin = zkCluster.createAdminClient()
+    val zkClient = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying().zkClient
+
+    // Bootstrap the ZK cluster ID into KRaft
+    val clusterId = zkCluster.clusterId()
+    val kraftCluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setBootstrapMetadataVersion(MetadataVersion.IBP_3_6_IV2).
+        setClusterId(Uuid.fromString(clusterId)).
+        setNumBrokerNodes(0).
+        setNumControllerNodes(1).build())
+      .setConfigProp(KafkaConfig.MigrationEnabledProp, "true")
+      .setConfigProp(KafkaConfig.ZkConnectProp, zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.zkConnect)
+      .build()
+    try {
+      kraftCluster.format()
+      kraftCluster.startup()
+      val readyFuture = kraftCluster.controllers().values().asScala.head.controller.waitForReadyBrokers(3)
+
+      // Enable migration configs and restart brokers
+      log.info("Restart brokers in migration mode")
+      val clientProps = kraftCluster.controllerClientProperties()
+      val voters = clientProps.get(RaftConfig.QUORUM_VOTERS_CONFIG)
+      zkCluster.config().serverProperties().put(KafkaConfig.MigrationEnabledProp, "true")
+      zkCluster.config().serverProperties().put(RaftConfig.QUORUM_VOTERS_CONFIG, voters)
+      zkCluster.config().serverProperties().put(KafkaConfig.ControllerListenerNamesProp, "CONTROLLER")
+      zkCluster.config().serverProperties().put(KafkaConfig.ListenerSecurityProtocolMapProp, "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT")
+      zkCluster.rollingBrokerRestart()
+      zkCluster.waitForReadyBrokers()
+      readyFuture.get(30, TimeUnit.SECONDS)
+
+      // Wait for migration to begin
+      log.info("Waiting for ZK migration to begin")
+      TestUtils.waitUntilTrue(
+        () => zkClient.getControllerId.contains(3000),
+        "Timed out waiting for KRaft controller to take over",
+        30000)
+
+      // Create a topic with replicas on brokers 0, 1, 2
+      log.info("Create new topic with AdminClient")
+      admin = zkCluster.createAdminClient()
+      val newTopics = new util.ArrayList[NewTopic]()
+      val replicaAssignment = Collections.singletonMap(Integer.valueOf(0), Seq(0, 1, 2).map(int2Integer).asJava)
+      newTopics.add(new NewTopic(topicName, replicaAssignment))
+      val createTopicResult = admin.createTopics(newTopics)
+      createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+      val topicPartition = new TopicPartition(topicName, 0)
+
+      // Verify the changes made to KRaft are seen in ZK
+      verifyTopicPartitionMetadata(topicName, Seq(topicPartition), zkClient)
+
+      // Reassign replicas to brokers 1, 2, 3 and wait for reassignment to complete
+      admin.alterPartitionReassignments(Collections.singletonMap(topicPartition,
+        Optional.of(new NewPartitionReassignment(Seq(1, 2, 3).map(int2Integer).asJava)))).all().get()
+
+      TestUtils.waitUntilTrue(() => {
+        val listPartitionReassignmentsResult = admin.listPartitionReassignments().reassignments().get()
+        listPartitionReassignmentsResult.isEmpty
+      }, "Timed out waiting for reassignments to complete.")
+
+      // Verify that the partition is removed from broker 0
+      TestUtils.waitUntilTrue(() => {
+        val brokers = zkCluster.asInstanceOf[ZkClusterInstance].getUnderlying.brokers
+        assertTrue(brokers.size == 4)
+        assertTrue(brokers.head.config.brokerId == 0)
+        brokers.head.replicaManager.onlinePartition(topicPartition).isEmpty
+      }, "Timed out waiting for removed replica reassignment to be marked offline")
+    } finally {
+      shutdownInSequence(zkCluster, kraftCluster)
+    }
+  }
+
   def verifyTopicPartitionMetadata(topicName: String, partitions: Seq[TopicPartition], zkClient: KafkaZkClient): Unit = {
     val (topicIdReplicaAssignment, success) = TestUtils.computeUntilTrue(
       zkClient.getReplicaAssignmentAndTopicIdForTopics(Set(topicName)).headOption) {