You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/07/07 18:48:42 UTC

kafka git commit: kafka-1367; Broker topic metadata not kept in sync with ZooKeeper; patched by Ashish Singh; reviewed by Jun Rao

Repository: kafka
Updated Branches:
  refs/heads/trunk 271b18d11 -> f77dc386c


kafka-1367; Broker topic metadata not kept in sync with ZooKeeper; patched by Ashish Singh; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f77dc386
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f77dc386
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f77dc386

Branch: refs/heads/trunk
Commit: f77dc386c099da5ff0bac4d2a12b04f7f17f07d3
Parents: 271b18d
Author: Ashish Singh <as...@cloudera.com>
Authored: Tue Jul 7 09:45:26 2015 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Jul 7 09:45:45 2015 -0700

----------------------------------------------------------------------
 .../scala/kafka/common/TopicAndPartition.scala  |  6 +-
 .../kafka/controller/KafkaController.scala      | 69 ++++++++++++++--
 .../scala/kafka/utils/ReplicationUtils.scala    | 16 +++-
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  1 +
 .../kafka/integration/TopicMetadataTest.scala   | 84 +++++++++++++++++---
 .../unit/kafka/utils/ReplicationUtilsTest.scala |  2 +
 6 files changed, 158 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/common/TopicAndPartition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/TopicAndPartition.scala b/core/src/main/scala/kafka/common/TopicAndPartition.scala
index df3db91..13a3f28 100644
--- a/core/src/main/scala/kafka/common/TopicAndPartition.scala
+++ b/core/src/main/scala/kafka/common/TopicAndPartition.scala
@@ -1,6 +1,7 @@
 package kafka.common
 
 import kafka.cluster.{Replica, Partition}
+import kafka.utils.Json
 
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
@@ -24,6 +25,8 @@ import kafka.cluster.{Replica, Partition}
  */
 case class TopicAndPartition(topic: String, partition: Int) {
 
+  private val version: Long = 1L
+
   def this(tuple: (String, Int)) = this(tuple._1, tuple._2)
 
   def this(partition: Partition) = this(partition.topic, partition.partitionId)
@@ -33,5 +36,6 @@ case class TopicAndPartition(topic: String, partition: Int) {
   def asTuple = (topic, partition)
 
   override def toString = "[%s,%d]".format(topic, partition)
-}
 
+  def toJson = Json.encode(Map("version" -> version, "topic" -> topic, "partition" -> partition))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 3635057..09630d0 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -16,8 +16,9 @@
  */
 package kafka.controller
 
-import collection._
-import collection.Set
+import java.util
+
+import scala.collection._
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
 import kafka.admin.AdminUtils
@@ -31,7 +32,7 @@ import kafka.utils.ZkUtils._
 import kafka.utils._
 import kafka.utils.CoreUtils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkDataListener, IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkChildListener, IZkDataListener, IZkStateListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
@@ -169,6 +170,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
 
   private val partitionReassignedListener = new PartitionsReassignedListener(this)
   private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
+  private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)
 
   newGauge(
     "ActiveControllerCount",
@@ -307,6 +309,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
       incrementControllerEpoch(zkClient)
       // before reading source of truth from zookeeper, register the listeners to get broker/topic callbacks
       registerReassignedPartitionsListener()
+      registerIsrChangeNotificationListener()
       registerPreferredReplicaElectionListener()
       partitionStateMachine.registerListeners()
       replicaStateMachine.registerListeners()
@@ -792,8 +795,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
     controllerContext.controllerChannelManager.startup()
   }
 
-  private def updateLeaderAndIsrCache() {
-    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, controllerContext.partitionReplicaAssignment.keySet)
+  def updateLeaderAndIsrCache(topicAndPartitions: Set[TopicAndPartition] = controllerContext.partitionReplicaAssignment.keySet) {
+    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topicAndPartitions)
     for((topicPartition, leaderIsrAndControllerEpoch) <- leaderAndIsrInfo)
       controllerContext.partitionLeadershipInfo.put(topicPartition, leaderIsrAndControllerEpoch)
   }
@@ -892,6 +895,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient, val brokerSt
     zkClient.subscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
   }
 
+  private def registerIsrChangeNotificationListener() = {
+    debug("Registering IsrChangeNotificationListener")
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
+    zkClient.subscribeChildChanges(ZkUtils.IsrChangeNotificationPath, isrChangeNotificationListener)
+  }
+
   private def deregisterReassignedPartitionsListener() = {
     zkClient.unsubscribeDataChanges(ZkUtils.ReassignPartitionsPath, partitionReassignedListener)
   }
@@ -1281,6 +1290,56 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
 }
 
 /**
+ * Called when leader intimates of isr change
+ * @param controller
+ */
+class IsrChangeNotificationListener(controller: KafkaController) extends IZkChildListener with Logging {
+  var topicAndPartitionSet: Set[TopicAndPartition] = Set()
+
+  override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
+    import scala.collection.JavaConverters._
+
+    inLock(controller.controllerContext.controllerLock) {
+      debug("[IsrChangeNotificationListener] Fired!!!")
+      val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
+      val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.map(x => getTopicAndPartition(x)).flatten.toSet
+      controller.updateLeaderAndIsrCache(topicAndPartitions)
+      processUpdateNotifications(topicAndPartitions)
+
+      // delete processed children
+      childrenAsScala.map(x => ZkUtils.deletePath(controller.controllerContext.zkClient, ZkUtils.TopicConfigChangesPath + "/" + x))
+    }
+  }
+
+  private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
+    val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
+    controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
+    debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
+  }
+
+  private def getTopicAndPartition(child: String): Option[TopicAndPartition] = {
+    val changeZnode: String = ZkUtils.IsrChangeNotificationPath + "/" + child
+    val (jsonOpt, stat) = ZkUtils.readDataMaybeNull(controller.controllerContext.zkClient, changeZnode)
+    if (jsonOpt.isDefined) {
+      val json = Json.parseFull(jsonOpt.get)
+
+      json match {
+        case Some(m) =>
+          val topicAndPartition = m.asInstanceOf[Map[String, Any]]
+          val topic = topicAndPartition("topic").asInstanceOf[String]
+          val partition = topicAndPartition("partition").asInstanceOf[Int]
+          Some(TopicAndPartition(topic, partition))
+        case None =>
+          error("Invalid topic and partition JSON: " + json + " in ZK: " + changeZnode)
+          None
+      }
+    } else {
+      None
+    }
+  }
+}
+
+/**
  * Starts the preferred replica leader election for the list of partitions specified under
  * /admin/preferred_replica_election -
  */

http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/utils/ReplicationUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ReplicationUtils.scala b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
index 6068733..783ba10 100644
--- a/core/src/main/scala/kafka/utils/ReplicationUtils.scala
+++ b/core/src/main/scala/kafka/utils/ReplicationUtils.scala
@@ -18,22 +18,32 @@
 package kafka.utils
 
 import kafka.api.LeaderAndIsr
+import kafka.common.TopicAndPartition
 import kafka.controller.LeaderIsrAndControllerEpoch
-import org.apache.zookeeper.data.Stat
 import org.I0Itec.zkclient.ZkClient
+import org.apache.zookeeper.data.Stat
 
-import scala.Some
 import scala.collection._
 
 object ReplicationUtils extends Logging {
 
+  val IsrChangeNotificationPrefix = "isr_change_"
+
   def updateLeaderAndIsr(zkClient: ZkClient, topic: String, partitionId: Int, newLeaderAndIsr: LeaderAndIsr, controllerEpoch: Int,
     zkVersion: Int): (Boolean,Int) = {
     debug("Updated ISR for partition [%s,%d] to %s".format(topic, partitionId, newLeaderAndIsr.isr.mkString(",")))
     val path = ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId)
     val newLeaderData = ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch)
     // use the epoch of the controller that made the leadership decision, instead of the current controller epoch
-    ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+    val updatePersistentPath: (Boolean, Int) = ZkUtils.conditionalUpdatePersistentPath(zkClient, path, newLeaderData, zkVersion, Some(checkLeaderAndIsrZkData))
+    if (updatePersistentPath._1) {
+      val topicAndPartition: TopicAndPartition = TopicAndPartition(topic, partitionId)
+      val isrChangeNotificationPath: String = ZkUtils.createSequentialPersistentPath(
+        zkClient, ZkUtils.IsrChangeNotificationPath + "/" + IsrChangeNotificationPrefix,
+        topicAndPartition.toJson)
+      debug("Added " + isrChangeNotificationPath + " for " + topicAndPartition)
+    }
+    updatePersistentPath
   }
 
   def checkLeaderAndIsrZkData(zkClient: ZkClient, path: String, expectedLeaderAndIsrInfo: String): (Boolean,Int) = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 78475e3..166814c 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -47,6 +47,7 @@ object ZkUtils extends Logging {
   val DeleteTopicsPath = "/admin/delete_topics"
   val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election"
   val BrokerSequenceIdPath = "/brokers/seqid"
+  val IsrChangeNotificationPath = "/isr_change_notification"
 
   def getTopicPath(topic: String): String = {
     BrokerTopicsPath + "/" + topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 995b059..a95ee5e 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -17,28 +17,32 @@
 
 package kafka.integration
 
-import org.apache.kafka.common.protocol.SecurityProtocol
-import org.scalatest.junit.JUnit3Suite
-import kafka.zk.ZooKeeperTestHarness
-import kafka.admin.AdminUtils
 import java.nio.ByteBuffer
+
 import junit.framework.Assert._
-import kafka.cluster.{BrokerEndPoint, Broker}
+import kafka.admin.AdminUtils
+import kafka.api.{TopicMetadataResponse, TopicMetadataRequest}
+import kafka.client.ClientUtils
+import kafka.cluster.{Broker, BrokerEndPoint}
+import kafka.common.ErrorMapping
+import kafka.server.{NotRunning, KafkaConfig, KafkaServer}
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
-import kafka.server.{KafkaServer, KafkaConfig}
-import kafka.api.TopicMetadataRequest
-import kafka.common.ErrorMapping
-import kafka.client.ClientUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.scalatest.junit.JUnit3Suite
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   private var server1: KafkaServer = null
   var brokerEndPoints: Seq[BrokerEndPoint] = null
+  var adHocConfigs: Seq[KafkaConfig] = null
+  val numConfigs: Int = 2
 
   override def setUp() {
     super.setUp()
-    val props = createBrokerConfigs(1, zkConnect)
-    val configs = props.map(KafkaConfig.fromProps)
+    val props = createBrokerConfigs(numConfigs, zkConnect)
+    val configs: Seq[KafkaConfig] = props.map(KafkaConfig.fromProps)
+    adHocConfigs = configs.takeRight(configs.size - 1) // Started and stopped by individual test cases
     server1 = TestUtils.createServer(configs.head)
     brokerEndPoints = Seq(new Broker(server1.config.brokerId, server1.config.hostName, server1.boundPort()).getBrokerEndPoint(SecurityProtocol.PLAINTEXT))
   }
@@ -130,4 +134,62 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertEquals(1, partitionMetadata.head.replicas.size)
     assertTrue(partitionMetadata.head.leader.isDefined)
   }
+
+  private def checkIsr(servers: Seq[KafkaServer]): Unit = {
+    val activeBrokers: Seq[KafkaServer] = servers.filter(x => x.brokerState.currentState != NotRunning.state)
+    val expectedIsr: Seq[BrokerEndPoint] = activeBrokers.map(
+      x => new BrokerEndPoint(x.config.brokerId,
+                              if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+                              x.boundPort())
+    )
+
+    // Assert that topic metadata at new brokers is updated correctly
+    activeBrokers.foreach(x => {
+      var metadata: TopicMetadataResponse = new TopicMetadataResponse(Seq(), Seq(), -1)
+      waitUntilTrue(() => {
+        metadata = ClientUtils.fetchTopicMetadata(
+                                Set.empty,
+                                Seq(new BrokerEndPoint(
+                                                  x.config.brokerId,
+                                                  if (x.config.hostName.nonEmpty) x.config.hostName else "localhost",
+                                                  x.boundPort())),
+                                "TopicMetadataTest-testBasicTopicMetadata",
+                                2000, 0)
+        metadata.topicsMetadata.nonEmpty &&
+          metadata.topicsMetadata.head.partitionsMetadata.nonEmpty &&
+          expectedIsr == metadata.topicsMetadata.head.partitionsMetadata.head.isr
+      },
+        "Topic metadata is not correctly updated for broker " + x + ".\n" +
+        "Expected ISR: " + expectedIsr + "\n" +
+        "Actual ISR  : " + (if (metadata.topicsMetadata.nonEmpty &&
+                                metadata.topicsMetadata.head.partitionsMetadata.nonEmpty)
+                              metadata.topicsMetadata.head.partitionsMetadata.head.isr
+                            else
+                              ""))
+    })
+  }
+
+
+  def testIsrAfterBrokerShutDownAndJoinsBack {
+    // start adHoc brokers
+    val adHocServers = adHocConfigs.map(p => createServer(p))
+    val allServers: Seq[KafkaServer] = Seq(server1) ++ adHocServers
+
+    // create topic
+    val topic: String = "test"
+    AdminUtils.createTopic(zkClient, topic, 1, numConfigs)
+
+    // shutdown a broker
+    adHocServers.last.shutdown()
+    adHocServers.last.awaitShutdown()
+
+    // startup a broker
+    adHocServers.last.startup()
+
+    // check metadata is still correct and updated at all brokers
+    checkIsr(allServers)
+
+    // shutdown adHoc brokers
+    adHocServers.map(p => p.shutdown())
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f77dc386/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index c96c0ff..b9de8d6 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -70,6 +70,8 @@ class ReplicationUtilsTest extends JUnit3Suite with ZooKeeperTestHarness {
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
     EasyMock.replay(replicaManager)
 
+    ZkUtils.makeSurePersistentPathExists(zkClient, ZkUtils.IsrChangeNotificationPath)
+
     val replicas = List(0,1)
 
     // regular update