You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/13 23:54:54 UTC

kafka git commit: KAFKA-2406: Throttle ISR propagation

Repository: kafka
Updated Branches:
  refs/heads/trunk e2ebae809 -> 2c55bd8aa


KAFKA-2406: Throttle ISR propagation

This is a follow up patch for KAFKA-2406. Further test to verify if this change alone is enough to solve the problem or not.

Author: Jiangjie Qin <be...@gmail.com>
Author: Jiangjie Qin <jq...@jqin-ld1.linkedin.biz>

Reviewers: Jun Rao <ju...@gmail.com>

Closes #114 from becketqin/KAFKA-2406


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2c55bd8a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2c55bd8a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2c55bd8a

Branch: refs/heads/trunk
Commit: 2c55bd8aa2666ea11ee2814b04f782eef8fa52c6
Parents: e2ebae8
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Aug 13 17:54:36 2015 -0400
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 13 17:54:36 2015 -0400

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +
 .../scala/kafka/common/TopicAndPartition.scala  |  4 --
 .../kafka/controller/KafkaController.scala      | 47 ++++++++++++--------
 .../main/scala/kafka/server/KafkaConfig.scala   |  7 +--
 .../scala/kafka/server/ReplicaManager.scala     | 43 +++++++++++-------
 .../scala/kafka/utils/ReplicationUtils.scala    | 23 ++++++----
 core/src/main/scala/kafka/utils/ZkUtils.scala   | 19 ++++----
 .../kafka/integration/TopicMetadataTest.scala   |  2 +-
 8 files changed, 88 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 511d3c9..ee332ed 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -428,7 +428,9 @@ class Partition(val topic: String,
     val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
       newLeaderAndIsr, controllerEpoch, zkVersion)
+
     if(updateSucceeded) {
+      replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
       inSyncReplicas = newIsr
       zkVersion = newVersion
       trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 13a3f28..95db1dc 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -25,8 +25,6 @@ import kafka.utils.Json
  */
 case class TopicAndPartition(topic: String, partition: Int) {
 
-  private val version: Long = 1L
-
   def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
 
   def this(partition: Partition) = this(partition.topic, partition.partitionId)
@@ -36,6 +34,4 @@ case class TopicAndPartition(topic: String, partition: Int) {
   def asTuple = (topic, partition)
 
   override def toString = "[%s,%d]".format(topic, partition)
-
-  def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition))
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 68536f5..4c37616 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -923,7 +923,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
 
   private def registerIsrChangeNotificationListener() = {
     debug("Registering IsrChangeNotificationListener")
-    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
     zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
   }
 
@@ -1339,7 +1338,6 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
  * @param controller
  */
 class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
-  var topicAndPartitionSet: Set[TopicAndPartition] = Set()
 
   override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
     import scala.collection.JavaConverters._
@@ -1347,23 +1345,25 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
     inLock(controller.controllerContext.controllerLock) {
       debug("[IsrChangeNotificationListener] Fired!!!")
       val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
-      val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
-      controller.updateLeaderAndIsrCache(topicAndPartitions)
-      processUpdateNotifications(topicAndPartitions)
-
-      // delete processed children
-      childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
-                                                  ZkUtils.getEntityConfigPath(ConfigType.Topic, x)))
+      try {
+        val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+        controller.updateLeaderAndIsrCache(topicAndPartitions)
+        processUpdateNotifications(topicAndPartitions)
+      } finally {
+        // delete processed children
+        childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+          ZkUtils.IsrChangeNotificationPath + "/" + x))
+      }
     }
   }
 
   private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
     val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
-    controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
     debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
+    controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
   }
 
-  private def getTopicAndPartition(child: String): Option[TopicAndPartition] = {
+  private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
     val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
     val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
     if (jsonOpt.isDefined) {
@@ -1371,20 +1371,31 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
 
       json match {
         case Some(m) =>
-          val topicAndPartition = m.asInstanceOf[Map[String, Any]]
-          val topic = topicAndPartition("topic").asInstanceOf[String]
-          val partition = topicAndPartition("partition").asInstanceOf[Int]
-          Some(TopicAndPartition(topic, partition))
+          val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
+          val isrChanges = m.asInstanceOf[Map[String, Any]]
+          val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
+          topicAndPartitionList.foreach {
+            case tp =>
+              val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
+              val topic = topicAndPartition("topic").asInstanceOf[String]
+              val partition = topicAndPartition("partition").asInstanceOf[Int]
+              topicAndPartitions += TopicAndPartition(topic, partition)
+          }
+          topicAndPartitions
         case None =>
-          error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode)
-          None
+          error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
+          Set.empty
       }
     } else {
-      None
+      Set.empty
     }
   }
 }
 
+object IsrChangeNotificationListener {
+  val version: Long = 1L
+}
+
 /**
  * Starts the preferred replica leader election for the list of partitions specified under
  * /admin/preferred_replica_election -

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dbe170f..a06f0bd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,10 +26,11 @@ import kafka.consumer.ConsumerConfig
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
 import kafka.utils.CoreUtils
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.metrics.MetricsReporter
 import org.apache.kafka.common.protocol.SecurityProtocol
-import scala.collection.{mutable, immutable, JavaConversions, Map}
+
+import scala.collection.{Map, immutable}
 
 object Defaults {
   /** ********* Zookeeper Configuration ***********/
@@ -531,7 +532,7 @@ object KafkaConfig {
    * Check that property names are valid
    */
   def validateNames(props: Properties) {
-    import JavaConversions._
+    import scala.collection.JavaConversions._
     val names = configDef.names()
     for (name <- props.keys)
       require(names.contains(name), "Unknown configuration \"%s\".".format(name))

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 795220e..2e0bbcd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,30 +16,24 @@
  */
 package kafka.server
 
+import java.io.{File, IOException}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.yammer.metrics.core.Gauge
 import kafka.api._
-import kafka.common._
-import kafka.utils._
 import kafka.cluster.{BrokerEndPoint, Partition, Replica}
-import kafka.log.{LogAppendInfo, LogManager}
-import kafka.metrics.KafkaMetricsGroup
+import kafka.common._
 import kafka.controller.KafkaController
+import kafka.log.{LogAppendInfo, LogManager}
 import kafka.message.{ByteBufferMessageSet, MessageSet}
-import kafka.api.ProducerResponseStatus
-import kafka.common.TopicAndPartition
-import kafka.api.PartitionFetchInfo
-
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.common.protocol.Errors
 
-import java.util.concurrent.atomic.AtomicBoolean
-import java.io.{IOException, File}
-import java.util.concurrent.TimeUnit
-
-import scala.Some
 import scala.collection._
 
-import org.I0Itec.zkclient.ZkClient
-import com.yammer.metrics.core.Gauge
-
 /*
  * Result metadata of a log append operation on the log
  */
@@ -116,6 +110,7 @@ class ReplicaManager(val config: KafkaConfig,
   private var hwThreadInitialized = false
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
+  private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
 
   val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
     purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
@@ -154,6 +149,21 @@ class ReplicaManager(val config: KafkaConfig,
       scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
   }
 
+  def recordIsrChange(topicAndPartition: TopicAndPartition) {
+    isrChangeSet synchronized {
+      isrChangeSet += topicAndPartition
+    }
+  }
+
+  def maybePropagateIsrChanges() {
+    isrChangeSet synchronized {
+      if (isrChangeSet.nonEmpty) {
+        ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
+        isrChangeSet.clear()
+      }
+    }
+  }
+
   /**
    * Try to complete some delayed produce requests with the request key;
    * this can be triggered when:
@@ -181,6 +191,7 @@ class ReplicaManager(val config: KafkaConfig,
   def startup() {
     // start ISR expiration thread
     scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
+    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 5000, unit = TimeUnit.MILLISECONDS)
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 783ba10..d99629a 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -19,7 +19,7 @@ package kafka.utils
 
 import kafka.api.LeaderAndIsr
 import kafka.common.TopicAndPartition
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
 import org.I0Itec.zkclient.ZkClient
 import org.apache.zookeeper.data.Stat
 
@@ -27,7 +27,7 @@ import scala.collection._
 
 object ReplicationUtils extends Logging {
 
-  val IsrChangeNotificationPrefix = "isr_change_"
+  private val IsrChangeNotificationPrefix = "isr_change_"
 
   def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
     zkVersion: Int): (Boolean,Int) = {
@@ -36,16 +36,16 @@ object ReplicationUtils extends Logging {
     val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
     // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
     val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
-    if (updatePersistentPath._1) {
-      val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId)
-      val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
-        zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
-        topicAndPartition.toJson)
-      debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition)
-    }
     updatePersistentPath
   }
 
+  def propagateIsrChanges(zkClient: ZkClient, isrChangeSet: Set[TopicAndPartition]): Unit = {
+    val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
+      zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
+      generateIsrChangeJson(isrChangeSet))
+    debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet)
+  }
+
   def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
     try {
       val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, path)
@@ -89,4 +89,9 @@ object ReplicationUtils extends Logging {
       Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
   }
 
+  private def generateIsrChangeJson(isrChanges: Set[TopicAndPartition]): String = {
+    val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
+    Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions))
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4ae310e..74b587e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -49,6 +49,16 @@ object ZkUtils extends Logging {
   val IsrChangeNotificationPath = "/isr_change_notification"
   val EntityConfigPath = "/config"
   val EntityConfigChangesPath = "/config/changes"
+  // These are persistent ZK paths that should exist on kafka broker startup.
+  val persistentZkPaths = Seq(ConsumersPath,
+                              BrokerIdsPath,
+                              BrokerTopicsPath,
+                              EntityConfigChangesPath,
+                              ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
+                              ZkUtils.getEntityConfigRootPath(ConfigType.Client),
+                              DeleteTopicsPath,
+                              BrokerSequenceIdPath,
+                              IsrChangeNotificationPath)
 
   def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic
@@ -97,14 +107,7 @@ object ZkUtils extends Logging {
   }
 
   def setupCommonPaths(zkClient: ZkClient) {
-    for(path <- Seq(ConsumersPath,
-                    BrokerIdsPath,
-                    BrokerTopicsPath,
-                    EntityConfigChangesPath,
-                    ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
-                    ZkUtils.getEntityConfigRootPath(ConfigType.Client),
-                    DeleteTopicsPath,
-                    BrokerSequenceIdPath))
+    for(path <- persistentZkPaths)
       makeSurePersistentPathExists(zkClient, path)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 61451a2..24f0a07 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -195,7 +195,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
                                 metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
                               metadata.topicsMetadata.head.partitionsMetadata.head.isr
                             else
-                              ""))
+                              ""), 6000L)
     })
   }