You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/14 17:17:35 UTC
svn commit: r1350291 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/cluster/ main/scala/kafka/consumer/
main/scala/kafka/network/ main/scala/kafka/producer/
main/scala/kafka/server/ main/scala/kafka/utils/
test/scala/unit/kafka/controlle...
Author: junrao
Date: Thu Jun 14 15:17:35 2012
New Revision: 1350291
URL: http://svn.apache.org/viewvc?rev=1350291&view=rev
Log:
embedded controller; patched by Yang Ye; reviewed by Jun Rao; KAFKA-335
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Jun 14 15:17:35 2012
@@ -39,7 +39,7 @@ class Partition(val topic: String,
try {
leaderISRUpdateLock.lock()
if(newLeader.isDefined) {
- info("Updating leader for for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
+ info("Updating leader for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
leaderReplicaId = newLeader
}
leaderReplicaId
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Jun 14 15:17:35 2012
@@ -31,7 +31,7 @@ class SimpleConsumer( val host: String,
val bufferSize: Int ) extends Logging {
private val lock = new Object()
- private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout)
+ private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
private def connect(): BlockingChannel = {
close
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Thu Jun 14 15:17:35 2012
@@ -22,6 +22,11 @@ import java.nio.channels._
import kafka.utils.{nonthreadsafe, Logging}
import kafka.api.RequestOrResponse
+
+object BlockingChannel{
+ val UseDefaultBufferSize = -1
+}
+
/**
* A simple blocking channel with timeouts correctly enabled.
*
@@ -32,7 +37,6 @@ class BlockingChannel( val host: String,
val readBufferSize: Int,
val writeBufferSize: Int,
val readTimeoutMs: Int ) extends Logging {
-
private var connected = false
private var channel: SocketChannel = null
private var readChannel: ReadableByteChannel = null
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Jun 14 15:17:35 2012
@@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProdu
private val lock = new Object()
@volatile private var shutdown: Boolean = false
- private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
+ private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.bufferSize, config.socketTimeoutMs)
trace("Instantiating Scala Sync Producer")
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Jun 14 15:17:35 2012
@@ -99,6 +99,13 @@ class KafkaConfig(props: Properties) ext
* Following properties are relevant to Kafka replication
*/
+ /* the socket timeout for controller-to-broker channels */
+ val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
+
+ /* the buffer size for controller-to-broker-channels */
+ val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
+
+
/* default replication factors for automatically created topics */
val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1350291&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Thu Jun 14 15:17:35 2012
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.common.KafkaZookeeperClient
+import collection.mutable.HashMap
+import collection.immutable.Set
+import kafka.cluster.Broker
+import kafka.api._
+import java.lang.Object
+import kafka.network.{Receive, BlockingChannel}
+import kafka.utils.{ZkUtils, Logging}
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+
+class RequestSendThread(val brokerId: Int,
+ val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+ val channel: BlockingChannel)
+ extends Thread("requestSendThread-" + brokerId) with Logging {
+ val isRunning: AtomicBoolean = new AtomicBoolean(true)
+ private val shutDownLatch = new CountDownLatch(1)
+ private val lock = new Object
+
+ def shutDown(): Unit = {
+ info("Shutting down controller request send thread to broker %d".format(brokerId))
+ isRunning.set(false)
+ interrupt()
+ shutDownLatch.await()
+ info("Controller request send thread to broker %d shutting down completed".format(brokerId))
+ }
+
+ override def run(): Unit = {
+ try{
+ info("In controller, thread for broker: " + brokerId + " started running")
+ while(isRunning.get()){
+ val queueItem = queue.take()
+ val request = queueItem._1
+ val callback = queueItem._2
+
+ var receive: Receive = null
+ lock synchronized {
+ channel.send(request)
+ receive = channel.receive()
+ }
+
+ var response: RequestOrResponse = null
+ request.requestId.get match {
+ case RequestKeys.LeaderAndISRRequest =>
+ response = LeaderAndISRResponse.readFrom(receive.buffer)
+ case RequestKeys.StopReplicaRequest =>
+ response = StopReplicaResponse.readFrom(receive.buffer)
+ }
+ if(callback != null){
+ callback(response)
+ }
+ }
+ } catch{
+ case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
+ case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
+ }
+ shutDownLatch.countDown()
+ }
+}
+
+class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
+ private val brokers = new HashMap[Int, Broker]
+ private val messageChannels = new HashMap[Int, BlockingChannel]
+ private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
+ private val messageThreads = new HashMap[Int, RequestSendThread]
+ for(broker <- allBrokers){
+ brokers.put(broker.id, broker)
+ info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+ val channel = new BlockingChannel(broker.host, broker.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ messageChannels.put(broker.id, channel)
+ messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+ }
+
+ def startUp() = {
+ for((brokerId, broker) <- brokers){
+ val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
+ thread.setDaemon(false)
+ thread.start()
+ messageThreads.put(broker.id, thread)
+ }
+ }
+
+ def shutDown() = {
+ for((brokerId, broker) <- brokers){
+ removeBroker(brokerId)
+ }
+ }
+
+ def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){
+ messageQueues(brokerId).put((request, callback))
+ }
+
+ def addBroker(broker: Broker){
+ brokers.put(broker.id, broker)
+ messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+ info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+ val channel = new BlockingChannel(broker.host, broker.port,
+ BlockingChannel.UseDefaultBufferSize,
+ BlockingChannel.UseDefaultBufferSize,
+ config.controllerSocketTimeoutMs)
+ channel.connect()
+ messageChannels.put(broker.id, channel)
+ val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
+ thread.setDaemon(false)
+ thread.start()
+ messageThreads.put(broker.id, thread)
+ }
+
+ def removeBroker(brokerId: Int){
+ brokers.remove(brokerId)
+ messageChannels(brokerId).disconnect()
+ messageChannels.remove(brokerId)
+ messageQueues.remove(brokerId)
+ messageThreads(brokerId).shutDown()
+ messageThreads.remove(brokerId)
+ }
+}
+
+class KafkaController(config : KafkaConfig) extends Logging {
+ info("controller startup");
+ private val lock = new Object
+
+ private var zkClient: ZkClient = null
+ private var controllerChannelManager: ControllerChannelManager = null
+ private var allBrokers : Set[Broker] = null
+ private var allTopics: Set[String] = null
+
+ private def tryToBecomeController() = {
+ lock synchronized {
+ val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ if (curController == null){
+ try {
+ ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
+
+ // Only the broker successfully registering as the controller can execute following code, otherwise
+ // some exception will be thrown.
+ registerBrokerChangeListener()
+ registerTopicChangeListener()
+ allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+ allTopics = ZkUtils.getAllTopics(zkClient).toSet
+ controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+ controllerChannelManager.startUp()
+ } catch {
+ case e: ZkNodeExistsException =>
+ registerControllerExistListener()
+ info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
+ case e2 => throw e2
+ }
+ }
+ else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
+ }
+ }
+
+ def isActive(): Boolean = {
+ controllerChannelManager != null
+ }
+
+ def startup() = {
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ registerSessionExpirationListener()
+ registerControllerExistListener()
+ tryToBecomeController()
+ }
+
+ def shutDown() = {
+ if(controllerChannelManager != null){
+ controllerChannelManager.shutDown()
+ }
+ zkClient.close()
+ }
+
+ def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
+ controllerChannelManager.sendRequest(brokerId, request, callback)
+ }
+
+ private def registerBrokerChangeListener() = {
+ zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+ }
+
+ private def registerTopicChangeListener() = {
+ zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
+ }
+
+ private def registerSessionExpirationListener() = {
+ zkClient.subscribeStateChanges(new SessionExpireListener())
+ }
+
+ private def registerControllerExistListener(){
+ zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
+ }
+
+ class SessionExpireListener() extends IZkStateListener {
+ @throws(classOf[Exception])
+ def handleStateChanged(state: KeeperState) {
+ // do nothing, since zkclient will do reconnect for us.
+ }
+
+ /**
+ * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+ * any ephemeral nodes here.
+ *
+ * @throws Exception
+ * On any error.
+ */
+ @throws(classOf[Exception])
+ def handleNewSession() {
+ info("Controller session expires, clean up the state, current controller: " + config.brokerId)
+ controllerChannelManager.shutDown()
+ controllerChannelManager = null
+ info("Controller session expires, the channel manager shut downr: " + config.brokerId)
+ tryToBecomeController()
+ }
+ }
+
+ class BrokerChangeListener() extends IZkChildListener with Logging {
+ def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
+ import scala.collection.JavaConversions._
+ lock synchronized {
+ info("Broker change listener at controller triggerred")
+ val allBrokerIds = allBrokers.map(_.id)
+ val curChildrenSeq: Seq[String] = javaCurChildren
+ val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
+ val curBrokerIds = curBrokerIdsSeq.toSet
+ val addedBrokerIds = curBrokerIds -- allBrokerIds
+ val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
+ info("Added brokers: " + addedBrokerIds.toString())
+ val deletedBrokerIds = allBrokerIds -- curBrokerIds
+ info("Deleted brokers: " + deletedBrokerIds.toString())
+
+ allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
+
+ for(broker <- addedBrokersSeq){
+ controllerChannelManager.addBroker(broker)
+ }
+ for (brokerId <- deletedBrokerIds){
+ controllerChannelManager.removeBroker(brokerId)
+ }
+ /** TODO: add other broker change handler logic**/
+ }
+ }
+ }
+
+ class TopicChangeListener extends IZkChildListener with Logging {
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ // TODO: Incomplete, do not need to review this time
+ }
+ }
+
+ class ControllerExistListener extends IZkDataListener with Logging {
+ @throws(classOf[Exception])
+ def handleDataChange(dataPath: String, data: Object) {
+ // do nothing, since No logic is needed here
+ }
+
+ @throws(classOf[Exception])
+ def handleDataDeleted(dataPath: String) {
+ info("Controller fail over, broker " + config.brokerId + " try to become controller")
+ tryToBecomeController()
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Jun 14 15:17:35 2012
@@ -42,13 +42,14 @@ class KafkaServer(val config: KafkaConfi
var kafkaZookeeper: KafkaZooKeeper = null
private var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
+ var kafkaController: KafkaController = new KafkaController(config)
/**
* Start up API for bringing up a single instance of the Kafka server.
* Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
*/
def startup() {
- info("Starting Kafka server...")
+ info("Starting Kafka server..." + config.brokerId)
isShuttingDown = new AtomicBoolean(false)
shutdownLatch = new CountDownLatch(1)
var needRecovery = true
@@ -89,6 +90,8 @@ class KafkaServer(val config: KafkaConfi
// starting relevant replicas and leader election for partitions assigned to this broker
kafkaZookeeper.startup
+ kafkaController.startup()
+
info("Server started.")
}
@@ -110,13 +113,16 @@ class KafkaServer(val config: KafkaConfi
Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.close()
+ if(kafkaController != null)
+ kafkaController.shutDown()
+
kafkaZookeeper.close
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
cleanShutDownFile.createNewFile
shutdownLatch.countDown()
- info("Kafka server shut down completed")
+ info("Kafka server with id %d shut down completed".format(config.brokerId))
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Jun 14 15:17:35 2012
@@ -32,6 +32,7 @@ object ZkUtils extends Logging {
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
val BrokerStatePath = "/brokers/state"
+ val ControllerPath = "/controller"
def getTopicPath(topic: String): String ={
BrokerTopicsPath + "/" + topic
@@ -41,10 +42,19 @@ object ZkUtils extends Logging {
getTopicPath(topic) + "/partitions"
}
+ def getController(zkClient: ZkClient): Int= {
+ val controller = readDataMaybeNull(zkClient, ControllerPath)
+ controller.toInt
+ }
+
def getTopicPartitionPath(topic: String, partitionId: String): String ={
getTopicPartitionsPath(topic) + "/" + partitionId
}
+ def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
+ getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
+ }
+
def getTopicVersion(zkClient: ZkClient, topic: String): String ={
readDataMaybeNull(zkClient, getTopicPath(topic))
}
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1350291&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Thu Jun 14 15:17:35 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.controller
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.api._
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
+
+
+class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness {
+ val props = createBrokerConfigs(4)
+ val configs = props.map(p => new KafkaConfig(p))
+ var brokers: Seq[KafkaServer] = null
+
+ override def setUp() {
+ super.setUp()
+ brokers = configs.map(config => TestUtils.createServer(config))
+ }
+
+ override def tearDown() {
+ super.tearDown()
+ brokers.foreach(_.shutdown())
+ }
+
+ def testControllerFailOver(){
+ brokers(0).shutdown()
+ brokers(1).shutdown()
+ brokers(3).shutdown()
+ Thread.sleep(1000)
+
+ var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ assertEquals(curController, "2")
+
+ brokers(1).startup()
+ brokers(2).shutdown()
+ Thread.sleep(1000)
+ curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ assertEquals(curController, "1")
+ }
+
+ def testControllerCommandSend(){
+ Thread.sleep(1000)
+ for(broker <- brokers){
+ if(broker.kafkaController.isActive){
+ val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
+ val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
+
+ val successCount: AtomicInteger = new AtomicInteger(0)
+ val countDownLatch: CountDownLatch = new CountDownLatch(8)
+
+ def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
+ val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
+ if(response.equals(expectedResponse))
+ successCount.addAndGet(1)
+ countDownLatch.countDown()
+ }
+
+ def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
+ val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
+ if(response.equals(expectedResponse))
+ successCount.addAndGet(1)
+ countDownLatch.countDown()
+ }
+
+ broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ countDownLatch.await()
+
+ assertEquals(successCount.get(), 8)
+ }
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Jun 14 15:17:35 2012
@@ -31,10 +31,13 @@ import org.I0Itec.zkclient.ZkClient
import kafka.cluster.Broker
import collection.mutable.ListBuffer
import kafka.consumer.ConsumerConfig
-import kafka.api.{TopicData, PartitionData}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kafka.serializer.{DefaultEncoder, Encoder}
+import kafka.common.ErrorMapping
+import kafka.api._
+import collection.mutable.{Map, Set}
+
/**
* Utility functions to help with testing
@@ -396,6 +399,52 @@ object TestUtils extends Logging {
}
+object ControllerTestUtils{
+ def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+
+ val leader1 = 1;
+ val ISR1 = List(1, 2, 3)
+
+ val leader2 = 2;
+ val ISR2 = List(2, 3, 4)
+
+ val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
+ val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
+ val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
+ ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
+ new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+ }
+
+ def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+ ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+ new LeaderAndISRResponse(1, responseMap)
+ }
+
+
+ def createSampleStopReplicaRequest() : StopReplicaRequest = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
+ (topic2, 1), (topic2, 2)))
+ }
+
+ def createSampleStopReplicaResponse() : StopReplicaResponse = {
+ val topic1 = "test1"
+ val topic2 = "test2"
+ val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+ ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+ new StopReplicaResponse(1, responseMap)
+ }
+}
+
+
+
+
object TestZKUtils {
val zookeeperConnect = "127.0.0.1:2182"
}