You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/07/07 18:48:42 UTC
kafka git commit: kafka-1367;
Broker topic metadata not kept in sync with ZooKeeper; patched by Ashish Singh;
reviewed by Jun Rao
Repository: kafka
Updated Branches:
refs/heads/trunk 271b18d11 -> f77dc386c
kafka-1367; Broker topic metadata not kept in sync with ZooKeeper; patched by Ashish Singh; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f77dc386
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f77dc386
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f77dc386
Branch: refs/heads/trunk
Commit: f77dc386c099da5ff0bac4d2a12b04f7f17f07d3
Parents: 271b18d
Author: Ashish Singh <as...@cloudera.com>
Authored: Tue Jul 7 09:45:26 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 7 09:45:45 2015 -0700
----------------------------------------------------------------------
.../scala/kafka/common/TopicAndPartition.scala | 6 +-
.../kafka/controller/KafkaController.scala | 69 ++++++++++++++--
.../scala/kafka/utils/ReplicationUtils.scala | 16 +++-
core/src/main/scala/kafka/utils/ZkUtils.scala | 1 +
.../kafka/integration/TopicMetadataTest.scala | 84 +++++++++++++++++---
.../unit/kafka/utils/ReplicationUtilsTest.scala | 2 +
6 files changed, 158 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index df3db91..13a3f28 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,6 +1,7 @@
package kafka.common
import kafka.cluster.{Replica, Partition}
+import kafka.utils.Json
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
@@ -24,6 +25,8 @@ import kafka.cluster.{Replica, Partition}
*/
case class TopicAndPartition(topic: String, partition: Int) {
+ private val version: Long = 1L
+
def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
def this(partition: Partition) = this(partition.topic, partition.partitionId)
@@ -33,5 +36,6 @@ case class TopicAndPartition(topic: String, partition: Int) {
def asTuple = (topic, partition)
override def toString = "[%s,%d]".format(topic, partition)
-}
+ def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition))
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3635057..09630d0 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,8 +16,9 @@
*/
package kafka.controller
-import collection._
-import collection.Set
+import java.util
+
+import scala.collection._
import com.yammer.metrics.core.Gauge
import java.util.concurrent.TimeUnit
import kafka.admin.AdminUtils
@@ -31,7 +32,7 @@ import kafka.utils.ZkUtils._
import kafka.utils._
import kafka.utils.CoreUtils._
import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
@@ -169,6 +170,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
+ private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
newGauge(
"ActiveControllerCount",
@@ -307,6 +309,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
incrementControllerEpoch(zkClient)
// before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
registerReassignedPartitionsListener()
+ registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
@@ -792,8 +795,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
controllerContext.controllerChannelManager.startup()
}
- private def updateLeaderAndIsrCache() {
- val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
+ def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
+ val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
}
@@ -892,6 +895,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
+ private def registerIsrChangeNotificationListener() = {
+ debug("Registering IsrChangeNotificationListener")
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
+ zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+ }
+
private def deregisterReassignedPartitionsListener() = {
zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
}
@@ -1281,6 +1290,56 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
}
/**
+ * Called when leader intimates of isr change
+ * @param controller
+ */
+class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
+ var topicAndPartitionSet: Set[TopicAndPartition] = Set()
+
+ override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
+ import scala.collection.JavaConverters._
+
+ inLock(controller.controllerContext.controllerLock) {
+ debug("[IsrChangeNotificationListener] Fired!!!")
+ val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
+ val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+ controller.updateLeaderAndIsrCache(topicAndPartitions)
+ processUpdateNotifications(topicAndPartitions)
+
+ // delete processed children
+ childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
+ }
+ }
+
+ private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
+ val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
+ controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
+ debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
+ }
+
+ private def getTopicAndPartition(child: String): Option[TopicAndPartition] = {
+ val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
+ val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
+ if (jsonOpt.isDefined) {
+ val json = Json.parseFull(jsonOpt.get)
+
+ json match {
+ case Some(m) =>
+ val topicAndPartition = m.asInstanceOf[Map[String, Any]]
+ val topic = topicAndPartition("topic").asInstanceOf[String]
+ val partition = topicAndPartition("partition").asInstanceOf[Int]
+ Some(TopicAndPartition(topic, partition))
+ case None =>
+ error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode)
+ None
+ }
+ } else {
+ None
+ }
+ }
+}
+
+/**
* Starts the preferred replica leader election for the list of partitions specified under
* /admin/preferred_replica_election -
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 6068733..783ba10 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,22 +18,32 @@
package kafka.utils
import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
import kafka.controller.LeaderIsrAndControllerEpoch
-import org.apache.zookeeper.data.Stat
import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.data.Stat
-import scala.Some
import scala.collection._
object ReplicationUtils extends Logging {
+ val IsrChangeNotificationPrefix = "isr_change_"
+
def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
zkVersion: Int): (Boolean,Int) = {
debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
// use the epoch of the controller that made the leadership decision, instead of the current controller epoch
- ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+ val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+ if (updatePersistentPath._1) {
+ val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId)
+ val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
+ zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
+ topicAndPartition.toJson)
+ debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition)
+ }
+ updatePersistentPath
}
def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 78475e3..166814c 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -47,6 +47,7 @@ object ZkUtils extends Logging {
val DeleteTopicsPath = "/admin/delete_topics"
val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
val BrokerSequenceIdPath = "/brokers/seqid"
+ val IsrChangeNotificationPath = "/isr_change_notification"
def getTopicPath(topic: String): String = {
BrokerTopicsPath + "/" + topic
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 995b059..a95ee5e 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,28 +17,32 @@
package kafka.integration
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
import java.nio.ByteBuffer
+
import junit.framework.Assert._
-import kafka.cluster.{BrokerEndPoint, Broker}
+import kafka.admin.AdminUtils
+import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
+import kafka.client.ClientUtils
+import kafka.cluster.{Broker, BrokerEndPoint}
+import kafka.common.ErrorMapping
+import kafka.server.{NotRunning, KafkaConfig, KafkaServer}
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
-import kafka.server.{KafkaServer, KafkaConfig}
-import kafka.api.TopicMetadataRequest
-import kafka.common.ErrorMapping
-import kafka.client.ClientUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.scalatest.junit.JUnit3Suite
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
private var server1: KafkaServer = null
var brokerEndPoints: Seq[BrokerEndPoint] = null
+ var adHocConfigs: Seq[KafkaConfig] = null
+ val numConfigs: Int = 2
override def setUp() {
super.setUp()
- val props = createBrokerConfigs(1, zkConnect)
- val configs = props.map(KafkaConfig.fromProps)
+ val props = createBrokerConfigs(numConfigs, zkConnect)
+ val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
+ adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
server1 = TestUtils.createServer(configs.head)
brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
}
@@ -130,4 +134,62 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
assertEquals(1, partitionMetadata.head.replicas.size)
assertTrue(partitionMetadata.head.leader.isDefined)
}
+
+ private def checkIsr(servers: Seq[KafkaServer]): Unit = {
+ val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
+ val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
+ x => new BrokerEndPoint(x.config.brokerId,
+ if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+ x.boundPort())
+ )
+
+ // Assert that topic metadata at new brokers is updated correctly
+ activeBrokers.foreach(x => {
+ var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+ waitUntilTrue(() => {
+ metadata = ClientUtils.fetchTopicMetadata(
+ Set.empty,
+ Seq(new BrokerEndPoint(
+ x.config.brokerId,
+ if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+ x.boundPort())),
+ "TopicMetadataTest-testBasicTopicMetadata",
+ 2000, 0)
+ metadata.topicsMetadata.nonEmpty &&
+ metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
+ expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr
+ },
+ "Topic metadata is not correctly updated for broker " + x + ".\n" +
+ "Expected ISR: " + expectedIsr + "\n" +
+ "Actual ISR : " + (if (metadata.topicsMetadata.nonEmpty &&
+ metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
+ metadata.topicsMetadata.head.partitionsMetadata.head.isr
+ else
+ ""))
+ })
+ }
+
+
+ def testIsrAfterBrokerShutDownAndJoinsBack {
+ // start adHoc brokers
+ val adHocServers = adHocConfigs.map(p => createServer(p))
+ val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
+
+ // create topic
+ val topic: String = "test"
+ AdminUtils.createTopic(zkClient, topic, 1, numConfigs)
+
+ // shutdown a broker
+ adHocServers.last.shutdown()
+ adHocServers.last.awaitShutdown()
+
+ // startup a broker
+ adHocServers.last.startup()
+
+ // check metadata is still correct and updated at all brokers
+ checkIsr(allServers)
+
+ // shutdown adHoc brokers
+ adHocServers.map(p => p.shutdown())
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index c96c0ff..b9de8d6 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -70,6 +70,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
EasyMock.replay(replicaManager)
+ ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
+
val replicas = List(0,1)
// regular update