You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/08/13 23:54:54 UTC
kafka git commit: KAFKA-2406: Throttle ISR propagation
Repository: kafka
Updated Branches:
refs/heads/trunk e2ebae809 -> 2c55bd8aa
KAFKA-2406: Throttle ISR propagation
This is a follow up patch for KAFKA-2406. Further test to verify if this change alone is enough to solve the problem or not.
Author: Jiangjie Qin <be...@gmail.com>
Author: Jiangjie Qin <jq...@jqin-ld1.linkedin.biz>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #114 from becketqin/KAFKA-2406
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2c55bd8a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2c55bd8a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2c55bd8a
Branch: refs/heads/trunk
Commit: 2c55bd8aa2666ea11ee2814b04f782eef8fa52c6
Parents: e2ebae8
Author: Jiangjie Qin <be...@gmail.com>
Authored: Thu Aug 13 17:54:36 2015 -0400
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Aug 13 17:54:36 2015 -0400
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 2 +
.../scala/kafka/common/TopicAndPartition.scala | 4 --
.../kafka/controller/KafkaController.scala | 47 ++++++++++++--------
.../main/scala/kafka/server/KafkaConfig.scala | 7 +--
.../scala/kafka/server/ReplicaManager.scala | 43 +++++++++++-------
.../scala/kafka/utils/ReplicationUtils.scala | 23 ++++++----
core/src/main/scala/kafka/utils/ZkUtils.scala | 19 ++++----
.../kafka/integration/TopicMetadataTest.scala | 2 +-
8 files changed, 88 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 511d3c9..ee332ed 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -428,7 +428,9 @@ class Partition(val topic: String,
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkClient, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)
+
if(updateSucceeded) {
+ replicaManager.recordIsrChange(new TopicAndPartition(topic, partitionId))
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index 13a3f28..95db1dc 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -25,8 +25,6 @@ import kafka.utils.Json
*/
case class TopicAndPartition(topic: String, partition: Int) {
- private val version: Long = 1L
-
def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
def this(partition: Partition) = this(partition.topic, partition.partitionId)
@@ -36,6 +34,4 @@ case class TopicAndPartition(topic: String, partition: Int) {
def asTuple = (topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
-
- def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition))
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 68536f5..4c37616 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -923,7 +923,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
private def registerIsrChangeNotificationListener() = {
debug("Registering IsrChangeNotificationListener")
- ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
}
@@ -1339,7 +1338,6 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
* @param controller
*/
class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
- var topicAndPartitionSet: Set[TopicAndPartition] = Set()
override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
import scala.collection.JavaConverters._
@@ -1347,23 +1345,25 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
inLock(controller.controllerContext.controllerLock) {
debug("[IsrChangeNotificationListener] Fired!!!")
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
- val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
- controller.updateLeaderAndIsrCache(topicAndPartitions)
- processUpdateNotifications(topicAndPartitions)
-
- // delete processed children
- childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
- ZkUtils.getEntityConfigPath(ConfigType.Topic, x)))
+ try {
+ val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+ controller.updateLeaderAndIsrCache(topicAndPartitions)
+ processUpdateNotifications(topicAndPartitions)
+ } finally {
+ // delete processed children
+ childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient,
+ ZkUtils.IsrChangeNotificationPath + "/" + x))
+ }
}
}
private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
- controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
+ controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
}
- private def getTopicAndPartition(child: String): Option[TopicAndPartition] = {
+ private def getTopicAndPartition(child: String): Set[TopicAndPartition] = {
val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
if (jsonOpt.isDefined) {
@@ -1371,20 +1371,31 @@ class IsrChangeNotificationListener(controller: KafkaController) extends IZkChil
json match {
case Some(m) =>
- val topicAndPartition = m.asInstanceOf[Map[String, Any]]
- val topic = topicAndPartition("topic").asInstanceOf[String]
- val partition = topicAndPartition("partition").asInstanceOf[Int]
- Some(TopicAndPartition(topic, partition))
+ val topicAndPartitions: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
+ val isrChanges = m.asInstanceOf[Map[String, Any]]
+ val topicAndPartitionList = isrChanges("partitions").asInstanceOf[List[Any]]
+ topicAndPartitionList.foreach {
+ case tp =>
+ val topicAndPartition = tp.asInstanceOf[Map[String, Any]]
+ val topic = topicAndPartition("topic").asInstanceOf[String]
+ val partition = topicAndPartition("partition").asInstanceOf[Int]
+ topicAndPartitions += TopicAndPartition(topic, partition)
+ }
+ topicAndPartitions
case None =>
- error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode)
- None
+ error("Invalid topic and partition JSON: " + jsonOpt.get + " in ZK: " + changeZnode)
+ Set.empty
}
} else {
- None
+ Set.empty
}
}
}
+object IsrChangeNotificationListener {
+ val version: Long = 1L
+}
+
/**
* Starts the preferred replica leader election for the list of partitions specified under
* /admin/preferred_replica_election -
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index dbe170f..a06f0bd 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -26,10 +26,11 @@ import kafka.consumer.ConsumerConfig
import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, MessageSet}
import kafka.utils.CoreUtils
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.{ConfigException, AbstractConfig, ConfigDef}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
import org.apache.kafka.common.metrics.MetricsReporter
import org.apache.kafka.common.protocol.SecurityProtocol
-import scala.collection.{mutable, immutable, JavaConversions, Map}
+
+import scala.collection.{Map, immutable}
object Defaults {
/** ********* Zookeeper Configuration ***********/
@@ -531,7 +532,7 @@ object KafkaConfig {
* Check that property names are valid
*/
def validateNames(props: Properties) {
- import JavaConversions._
+ import scala.collection.JavaConversions._
val names = configDef.names()
for (name <- props.keys)
require(names.contains(name), "Unknown configuration \"%s\".".format(name))
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 795220e..2e0bbcd 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,30 +16,24 @@
*/
package kafka.server
+import java.io.{File, IOException}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.yammer.metrics.core.Gauge
import kafka.api._
-import kafka.common._
-import kafka.utils._
import kafka.cluster.{BrokerEndPoint, Partition, Replica}
-import kafka.log.{LogAppendInfo, LogManager}
-import kafka.metrics.KafkaMetricsGroup
+import kafka.common._
import kafka.controller.KafkaController
+import kafka.log.{LogAppendInfo, LogManager}
import kafka.message.{ByteBufferMessageSet, MessageSet}
-import kafka.api.ProducerResponseStatus
-import kafka.common.TopicAndPartition
-import kafka.api.PartitionFetchInfo
-
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils._
+import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.common.protocol.Errors
-import java.util.concurrent.atomic.AtomicBoolean
-import java.io.{IOException, File}
-import java.util.concurrent.TimeUnit
-
-import scala.Some
import scala.collection._
-import org.I0Itec.zkclient.ZkClient
-import com.yammer.metrics.core.Gauge
-
/*
* Result metadata of a log append operation on the log
*/
@@ -116,6 +110,7 @@ class ReplicaManager(val config: KafkaConfig,
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
+ private val isrChangeSet: mutable.Set[TopicAndPartition] = new mutable.HashSet[TopicAndPartition]()
val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
@@ -154,6 +149,21 @@ class ReplicaManager(val config: KafkaConfig,
scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.replicaHighWatermarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
+ def recordIsrChange(topicAndPartition: TopicAndPartition) {
+ isrChangeSet synchronized {
+ isrChangeSet += topicAndPartition
+ }
+ }
+
+ def maybePropagateIsrChanges() {
+ isrChangeSet synchronized {
+ if (isrChangeSet.nonEmpty) {
+ ReplicationUtils.propagateIsrChanges(zkClient, isrChangeSet)
+ isrChangeSet.clear()
+ }
+ }
+ }
+
/**
* Try to complete some delayed produce requests with the request key;
* this can be triggered when:
@@ -181,6 +191,7 @@ class ReplicaManager(val config: KafkaConfig,
def startup() {
// start ISR expiration thread
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs, unit = TimeUnit.MILLISECONDS)
+ scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 5000, unit = TimeUnit.MILLISECONDS)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 783ba10..d99629a 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -19,7 +19,7 @@ package kafka.utils
import kafka.api.LeaderAndIsr
import kafka.common.TopicAndPartition
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{IsrChangeNotificationListener, LeaderIsrAndControllerEpoch}
import org.I0Itec.zkclient.ZkClient
import org.apache.zookeeper.data.Stat
@@ -27,7 +27,7 @@ import scala.collection._
object ReplicationUtils extends Logging {
- val IsrChangeNotificationPrefix = "isr_change_"
+ private val IsrChangeNotificationPrefix = "isr_change_"
def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
@@ -36,16 +36,16 @@ object ReplicationUtils extends Logging {
val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
- if (updatePersistentPath._1) {
- val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId)
- val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
- zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
- topicAndPartition.toJson)
- debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition)
- }
updatePersistentPath
}
+ def propagateIsrChanges(zkClient: ZkClient, isrChangeSet: Set[TopicAndPartition]): Unit = {
+ val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
+ zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
+ generateIsrChangeJson(isrChangeSet))
+ debug("Added " + isrChangeNotificationPath + " for " + isrChangeSet)
+ }
+
def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
try {
val writtenLeaderAndIsrInfo = ZkUtils.readDataMaybeNull(zkClient, path)
@@ -89,4 +89,9 @@ object ReplicationUtils extends Logging {
Some(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch))}
}
+ private def generateIsrChangeJson(isrChanges: Set[TopicAndPartition]): String = {
+ val partitions = isrChanges.map(tp => Map("topic" -> tp.topic, "partition" -> tp.partition)).toArray
+ Json.encode(Map("version" -> IsrChangeNotificationListener.version, "partitions" -> partitions))
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 4ae310e..74b587e 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -49,6 +49,16 @@ object ZkUtils extends Logging {
val IsrChangeNotificationPath = "/isr_change_notification"
val EntityConfigPath = "/config"
val EntityConfigChangesPath = "/config/changes"
+ // These are persistent ZK paths that should exist on kafka broker startup.
+ val persistentZkPaths = Seq(ConsumersPath,
+ BrokerIdsPath,
+ BrokerTopicsPath,
+ EntityConfigChangesPath,
+ ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
+ ZkUtils.getEntityConfigRootPath(ConfigType.Client),
+ DeleteTopicsPath,
+ BrokerSequenceIdPath,
+ IsrChangeNotificationPath)
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
@@ -97,14 +107,7 @@ object ZkUtils extends Logging {
}
def setupCommonPaths(zkClient: ZkClient) {
- for(path <- Seq(ConsumersPath,
- BrokerIdsPath,
- BrokerTopicsPath,
- EntityConfigChangesPath,
- ZkUtils.getEntityConfigRootPath(ConfigType.Topic),
- ZkUtils.getEntityConfigRootPath(ConfigType.Client),
- DeleteTopicsPath,
- BrokerSequenceIdPath))
+ for(path <- persistentZkPaths)
makeSurePersistentPathExists(zkClient, path)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/2c55bd8a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 61451a2..24f0a07 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -195,7 +195,7 @@ class TopicMetadataTest extends ZooKeeperTestHarness {
metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
metadata.topicsMetadata.head.partitionsMetadata.head.isr
else
- ""))
+ ""), 6000L)
})
}