You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/10/14 01:08:20 UTC

[kafka] branch 2.7 updated: KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 63c8919  KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)
63c8919 is described below

commit 63c891935d2b7a150a088942803a28fec7b29f94
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Oct 13 17:53:47 2020 -0700

    KAFKA-10521; Skip partition watch registration when `AlterIsr` is expected (#9353)
    
    Before `AlterIsr` which was introduced in KIP-497, the controller would register watches in Zookeeper for each reassigning partition so that it could be notified immediately when the ISR was expanded and the reassignment could be completed. This notification is not needed with the latest IBP when `AlterIsr` is enabled because the controller will execute all ISR changes itself.
    
    There is one subtle detail. If we are in the middle of a roll in order to bump the IBP, then it is possible for the controller to be on the latest IBP while some of the brokers are still on the older one. In this case, the brokers on the older IBP will not send `AlterIsr`, but we can still rely on the delayed notification through the `isr_notifications` path to complete reassignments. This seems like a reasonable tradeoff since it should be a short window before the roll is completed.
    
    Reviewers: David Jacot <dj...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 .../scala/kafka/controller/KafkaController.scala   | 65 +++++++++++++-----
 .../main/scala/kafka/server/ReplicaManager.scala   | 36 +++++++---
 .../admin/ReassignPartitionsIntegrationTest.scala  | 78 +++++++++++++++++-----
 3 files changed, 139 insertions(+), 40 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 47b52c6..cc66060 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -84,6 +84,7 @@ class KafkaController(val config: KafkaConfig,
   @volatile private var brokerInfo = initialBrokerInfo
   @volatile private var _brokerEpoch = initialBrokerEpoch
 
+  private val isAlterIsrEnabled = config.interBrokerProtocolVersion >= KAFKA_2_7_IV2
   private val stateChangeLogger = new StateChangeLogger(config.brokerId, inControllerContext = true, None)
   val controllerContext = new ControllerContext
   var controllerChannelManager = new ControllerChannelManager(controllerContext, config, time, metrics,
@@ -789,8 +790,10 @@ class KafkaController(val config: KafkaConfig,
         stopRemovedReplicasOfReassignedPartition(topicPartition, unneededReplicas)
     }
 
-    val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
-    zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+    if (!isAlterIsrEnabled) {
+      val reassignIsrChangeHandler = new PartitionReassignmentIsrChangeHandler(eventManager, topicPartition)
+      zkClient.registerZNodeChangeHandler(reassignIsrChangeHandler)
+    }
 
     controllerContext.partitionsBeingReassigned.add(topicPartition)
   }
@@ -1089,17 +1092,21 @@ class KafkaController(val config: KafkaConfig,
   }
 
   private def unregisterPartitionReassignmentIsrChangeHandlers(): Unit = {
-    controllerContext.partitionsBeingReassigned.foreach { tp =>
-      val path = TopicPartitionStateZNode.path(tp)
-      zkClient.unregisterZNodeChangeHandler(path)
+    if (!isAlterIsrEnabled) {
+      controllerContext.partitionsBeingReassigned.foreach { tp =>
+        val path = TopicPartitionStateZNode.path(tp)
+        zkClient.unregisterZNodeChangeHandler(path)
+      }
     }
   }
 
   private def removePartitionFromReassigningPartitions(topicPartition: TopicPartition,
                                                        assignment: ReplicaAssignment): Unit = {
     if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-      val path = TopicPartitionStateZNode.path(topicPartition)
-      zkClient.unregisterZNodeChangeHandler(path)
+      if (!isAlterIsrEnabled) {
+        val path = TopicPartitionStateZNode.path(topicPartition)
+        zkClient.unregisterZNodeChangeHandler(path)
+      }
       maybeRemoveFromZkReassignment((tp, replicas) => tp == topicPartition && replicas == assignment.replicas)
       controllerContext.partitionsBeingReassigned.remove(topicPartition)
     } else {
@@ -1830,13 +1837,17 @@ class KafkaController(val config: KafkaConfig,
     if (!isActive) return
 
     if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
-      val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
-      if (isReassignmentComplete(topicPartition, reassignment)) {
-        // resume the partition reassignment process
-        info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
-          s"reassigning partition $topicPartition")
-        onPartitionReassignment(topicPartition, reassignment)
-      }
+      maybeCompleteReassignment(topicPartition)
+    }
+  }
+
+  private def maybeCompleteReassignment(topicPartition: TopicPartition): Unit = {
+    val reassignment = controllerContext.partitionFullReplicaAssignment(topicPartition)
+    if (isReassignmentComplete(topicPartition, reassignment)) {
+      // resume the partition reassignment process
+      info(s"Target replicas ${reassignment.targetReplicas} have all caught up with the leader for " +
+        s"reassigning partition $topicPartition")
+      onPartitionReassignment(topicPartition, reassignment)
     }
   }
 
@@ -2073,6 +2084,16 @@ class KafkaController(val config: KafkaConfig,
       if (partitions.nonEmpty) {
         updateLeaderAndIsrCache(partitions)
         processUpdateNotifications(partitions)
+
+        // During a partial upgrade, the controller may be on an IBP which assumes
+        // ISR changes through the `AlterIsr` API while some brokers are on an older
+        // IBP which assumes notification through Zookeeper. In this case, since the
+        // controller will not have registered watches for reassigning partitions, we
+        // can still rely on the batch ISR change notification path in order to
+        // complete the reassignment.
+        partitions.filter(controllerContext.partitionsBeingReassigned.contains).foreach { topicPartition =>
+          maybeCompleteReassignment(topicPartition)
+        }
       }
     } finally {
       // delete the notifications
@@ -2227,7 +2248,8 @@ class KafkaController(val config: KafkaConfig,
     eventManager.put(AlterIsrReceived(alterIsrRequest.brokerId, alterIsrRequest.brokerEpoch, isrsToAlter, responseCallback))
   }
 
-  private def processAlterIsr(brokerId: Int, brokerEpoch: Long, isrsToAlter: Map[TopicPartition, LeaderAndIsr],
+  private def processAlterIsr(brokerId: Int, brokerEpoch: Long,
+                              isrsToAlter: Map[TopicPartition, LeaderAndIsr],
                               callback: AlterIsrCallback): Unit = {
 
     // Handle a few short-circuits
@@ -2315,6 +2337,19 @@ class KafkaController(val config: KafkaConfig,
     }
 
     callback.apply(response)
+
+    // After we have returned the result of the `AlterIsr` request, we should check whether
+    // there are any reassignments which can be completed by a successful ISR expansion.
+    response.left.foreach { alterIsrResponses =>
+      alterIsrResponses.forKeyValue { (topicPartition, partitionResponse) =>
+        if (controllerContext.partitionsBeingReassigned.contains(topicPartition)) {
+          val isSuccessfulUpdate = partitionResponse.isRight
+          if (isSuccessfulUpdate) {
+            maybeCompleteReassignment(topicPartition)
+          }
+        }
+      }
+    }
   }
 
   private def processControllerChange(): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index c080d6a..9c7307b 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -164,10 +164,26 @@ object HostedPartition {
   final object Offline extends HostedPartition
 }
 
+case class IsrChangePropagationConfig(
+  // How often to check for ISR
+  checkIntervalMs: Long,
+
+  // Maximum time that an ISR change may be delayed before sending the notification
+  maxDelayMs: Long,
+
+  // Maximum time to await additional changes before sending the notification
+  lingerMs: Long
+)
+
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
-  val IsrChangePropagationBackoff = 5000L
-  val IsrChangePropagationInterval = 60000L
+
+  // This field is mutable to allow overriding change notification behavior in test cases
+  @volatile var DefaultIsrPropagationConfig: IsrChangePropagationConfig = IsrChangePropagationConfig(
+    checkIntervalMs = 2500,
+    lingerMs = 5000,
+    maxDelayMs = 60000,
+  )
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -233,9 +249,10 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = s"[ReplicaManager broker=$localBrokerId] "
   private val stateChangeLogger = new StateChangeLogger(localBrokerId, inControllerContext = false, None)
 
+  private val isrChangeNotificationConfig = ReplicaManager.DefaultIsrPropagationConfig
   private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
-  private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
-  private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
+  private val lastIsrChangeMs = new AtomicLong(time.milliseconds())
+  private val lastIsrPropagationMs = new AtomicLong(time.milliseconds())
 
   private var logDirFailureHandler: LogDirFailureHandler = null
 
@@ -278,7 +295,7 @@ class ReplicaManager(val config: KafkaConfig,
   def recordIsrChange(topicPartition: TopicPartition): Unit = {
     isrChangeSet synchronized {
       isrChangeSet += topicPartition
-      lastIsrChangeMs.set(System.currentTimeMillis())
+      lastIsrChangeMs.set(time.milliseconds())
     }
   }
   /**
@@ -289,11 +306,11 @@ class ReplicaManager(val config: KafkaConfig,
    * other brokers when large amount of ISR change occurs.
    */
   def maybePropagateIsrChanges(): Unit = {
-    val now = System.currentTimeMillis()
+    val now = time.milliseconds()
     isrChangeSet synchronized {
       if (isrChangeSet.nonEmpty &&
-        (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBackoff < now ||
-          lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
+        (lastIsrChangeMs.get() + isrChangeNotificationConfig.lingerMs < now ||
+          lastIsrPropagationMs.get() + isrChangeNotificationConfig.maxDelayMs < now)) {
         zkClient.propagateIsrChanges(isrChangeSet)
         isrChangeSet.clear()
         lastIsrPropagationMs.set(now)
@@ -324,7 +341,8 @@ class ReplicaManager(val config: KafkaConfig,
     scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
     // If using AlterIsr, we don't need the znode ISR propagation
     if (config.interBrokerProtocolVersion < KAFKA_2_7_IV2) {
-      scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)
+      scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _,
+        period = isrChangeNotificationConfig.checkIntervalMs, unit = TimeUnit.MILLISECONDS)
     } else {
       alterIsrManager.start()
     }
diff --git a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
index aeb5ac7..6d5d02d 100644
--- a/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/admin/ReassignPartitionsIntegrationTest.scala
@@ -21,23 +21,24 @@ import java.io.Closeable
 import java.util.{Collections, HashMap, List}
 
 import kafka.admin.ReassignPartitionsCommand._
-import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.api.KAFKA_2_7_IV1
+import kafka.server.{IsrChangePropagationConfig, KafkaConfig, KafkaServer, ReplicaManager}
+import kafka.utils.Implicits._
 import kafka.utils.TestUtils
 import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, AlterConfigOp, ConfigEntry, DescribeLogDirsResult, NewTopic}
 import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
-import org.junit.rules.Timeout
+import org.apache.kafka.common.{TopicPartition, TopicPartitionReplica}
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
+import org.junit.rules.Timeout
 import org.junit.{After, Rule, Test}
 
-import scala.collection.Map
+import scala.collection.{Map, Seq, mutable}
 import scala.jdk.CollectionConverters._
-import scala.collection.{Seq, mutable}
 
 class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
   @Rule
@@ -45,10 +46,6 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
 
   var cluster: ReassignPartitionsTestCluster = null
 
-  def generateConfigs: Seq[KafkaConfig] = {
-    TestUtils.createBrokerConfigs(5, zkConnect).map(KafkaConfig.fromProps)
-  }
-
   @After
   override def tearDown(): Unit = {
     Utils.closeQuietly(cluster, "ReassignPartitionsTestCluster")
@@ -60,13 +57,53 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
       brokerId -> brokerLevelThrottles.map(throttle => (throttle, -1L)).toMap
     }.toMap
 
-  /**
-   * Test running a quick reassignment.
-   */
+
   @Test
   def testReassignment(): Unit = {
     cluster = new ReassignPartitionsTestCluster(zkConnect)
     cluster.setup()
+    executeAndVerifyReassignment()
+  }
+
+  @Test
+  def testReassignmentWithAlterIsrDisabled(): Unit = {
+    // Test reassignment when the IBP is on an older version which does not use
+    // the `AlterIsr` API. In this case, the controller will register individual
+    // watches for each reassigning partition so that the reassignment can be
+    // completed as soon as the ISR is expanded.
+    val configOverrides = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
+    cluster = new ReassignPartitionsTestCluster(zkConnect, configOverrides = configOverrides)
+    cluster.setup()
+    executeAndVerifyReassignment()
+  }
+
+  @Test
+  def testReassignmentCompletionDuringPartialUpgrade(): Unit = {
+    // Test reassignment during a partial upgrade when some brokers are relying on
+    // `AlterIsr` and some rely on the old notification logic through Zookeeper.
+    // In this test case, broker 0 starts up first on the latest IBP and is typically
+    // elected as controller. The three remaining brokers start up on the older IBP.
+    // We want to ensure that reassignment can still complete through the ISR change
+    // notification path even though the controller expects `AlterIsr`.
+
+    // Override change notification settings so that test is not delayed by ISR
+    // change notification delay
+    ReplicaManager.DefaultIsrPropagationConfig = IsrChangePropagationConfig(
+      checkIntervalMs = 500,
+      lingerMs = 100,
+      maxDelayMs = 500
+    )
+
+    val oldIbpConfig = Map(KafkaConfig.InterBrokerProtocolVersionProp -> KAFKA_2_7_IV1.version)
+    val brokerConfigOverrides = Map(1 -> oldIbpConfig, 2 -> oldIbpConfig, 3 -> oldIbpConfig)
+
+    cluster = new ReassignPartitionsTestCluster(zkConnect, brokerConfigOverrides = brokerConfigOverrides)
+    cluster.setup()
+
+    executeAndVerifyReassignment()
+  }
+
+  def executeAndVerifyReassignment(): Unit = {
     val assignment = """{"version":1,"partitions":""" +
       """[{"topic":"foo","partition":0,"replicas":[0,1,3],"log_dirs":["any","any","any"]},""" +
       """{"topic":"bar","partition":0,"replicas":[3,2,0],"log_dirs":["any","any","any"]}""" +
@@ -594,7 +631,11 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
     }
   }
 
-  class ReassignPartitionsTestCluster(val zkConnect: String) extends Closeable {
+  class ReassignPartitionsTestCluster(
+    val zkConnect: String,
+    configOverrides: Map[String, String] = Map.empty,
+    brokerConfigOverrides: Map[Int, Map[String, String]] = Map.empty
+  ) extends Closeable {
     val brokers = Map(
       0 -> "rack0",
       1 -> "rack0",
@@ -622,6 +663,12 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
         // Don't move partition leaders automatically.
         config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
         config.setProperty(KafkaConfig.ReplicaLagTimeMaxMsProp, "1000")
+        configOverrides.forKeyValue(config.setProperty)
+
+        brokerConfigOverrides.get(brokerId).foreach { overrides =>
+          overrides.forKeyValue(config.setProperty)
+        }
+
         config
     }.toBuffer
 
@@ -637,9 +684,8 @@ class ReassignPartitionsIntegrationTest extends ZooKeeperTestHarness {
     }
 
     def createServers(): Unit = {
-      brokers.keySet.foreach {
-        case brokerId =>
-          servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
+      brokers.keySet.foreach { brokerId =>
+        servers += TestUtils.createServer(KafkaConfig(brokerConfigs(brokerId)))
       }
     }