You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/06/14 17:17:35 UTC

svn commit: r1350291 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/cluster/ main/scala/kafka/consumer/ main/scala/kafka/network/ main/scala/kafka/producer/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/controlle...

Author: junrao
Date: Thu Jun 14 15:17:35 2012
New Revision: 1350291

URL: http://svn.apache.org/viewvc?rev=1350291&view=rev
Log:
embedded controller; patched by Yang Ye; reviewed by Jun Rao; KAFKA-335

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Jun 14 15:17:35 2012
@@ -39,7 +39,7 @@ class Partition(val topic: String,
     try {
       leaderISRUpdateLock.lock()
       if(newLeader.isDefined) {
-        info("Updating leader for for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
+        info("Updating leader for topic %s partition %d to replica %d".format(topic, partitionId, newLeader.get))
         leaderReplicaId = newLeader
       }
       leaderReplicaId

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Jun 14 15:17:35 2012
@@ -31,7 +31,7 @@ class SimpleConsumer( val host: String,
                       val bufferSize: Int ) extends Logging {
 
   private val lock = new Object()
-  private val blockingChannel = new BlockingChannel(host, port, bufferSize, 0, soTimeout)
+  private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
 
   private def connect(): BlockingChannel = {
     close

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/BlockingChannel.scala Thu Jun 14 15:17:35 2012
@@ -22,6 +22,11 @@ import java.nio.channels._
 import kafka.utils.{nonthreadsafe, Logging}
 import kafka.api.RequestOrResponse
 
+
+object BlockingChannel{
+  val UseDefaultBufferSize = -1
+}
+
 /**
  *  A simple blocking channel with timeouts correctly enabled.
  *
@@ -32,7 +37,6 @@ class BlockingChannel( val host: String,
                        val readBufferSize: Int, 
                        val writeBufferSize: Int, 
                        val readTimeoutMs: Int ) extends Logging {
-
   private var connected = false
   private var channel: SocketChannel = null
   private var readChannel: ReadableByteChannel = null

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Jun 14 15:17:35 2012
@@ -42,7 +42,7 @@ class SyncProducer(val config: SyncProdu
 
   private val lock = new Object()
   @volatile private var shutdown: Boolean = false
-  private val blockingChannel = new BlockingChannel(config.host, config.port, 0, config.bufferSize, config.socketTimeoutMs)
+  private val blockingChannel = new BlockingChannel(config.host, config.port, BlockingChannel.UseDefaultBufferSize, config.bufferSize, config.socketTimeoutMs)
 
   trace("Instantiating Scala Sync Producer")
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Thu Jun 14 15:17:35 2012
@@ -99,6 +99,13 @@ class KafkaConfig(props: Properties) ext
    * Following properties are relevant to Kafka replication
    */
 
+  /* the socket timeout for controller-to-broker channels */
+  val controllerSocketTimeoutMs = Utils.getInt(props, "controller.socket.timeout.ms", 30000)
+
+  /* the buffer size for controller-to-broker-channels */
+  val controllerMessageQueueSize= Utils.getInt(props, "controller.message.queue.size", 10)
+
+
   /* default replication factors for automatically created topics */
   val defaultReplicationFactor = Utils.getInt(props, "default.replication.factor", 1)
 

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1350291&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Thu Jun 14 15:17:35 2012
@@ -0,0 +1,288 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.common.KafkaZookeeperClient
+import collection.mutable.HashMap
+import collection.immutable.Set
+import kafka.cluster.Broker
+import kafka.api._
+import java.lang.Object
+import kafka.network.{Receive, BlockingChannel}
+import kafka.utils.{ZkUtils, Logging}
+import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import java.util.concurrent.atomic.AtomicBoolean
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+
+class RequestSendThread(val brokerId: Int,
+                        val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                        val channel: BlockingChannel)
+        extends Thread("requestSendThread-" + brokerId) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutDownLatch = new CountDownLatch(1)
+  private val lock = new Object
+
+  def shutDown(): Unit = {
+    info("Shutting down controller request send thread to broker %d".format(brokerId))
+    isRunning.set(false)
+    interrupt()
+    shutDownLatch.await()
+    info("Controller request send thread to broker %d shutting down completed".format(brokerId))
+  }
+
+  override def run(): Unit = {
+    try{
+      info("In controller, thread for broker: " + brokerId + " started running")
+      while(isRunning.get()){
+        val queueItem = queue.take()
+        val request = queueItem._1
+        val callback = queueItem._2
+
+        var receive: Receive = null
+        lock synchronized {
+          channel.send(request)
+          receive = channel.receive()
+        }
+
+        var response: RequestOrResponse = null
+        request.requestId.get match {
+          case RequestKeys.LeaderAndISRRequest =>
+            response = LeaderAndISRResponse.readFrom(receive.buffer)
+          case RequestKeys.StopReplicaRequest =>
+            response = StopReplicaResponse.readFrom(receive.buffer)
+        }
+        if(callback != null){
+          callback(response)
+        }
+      }
+    } catch{
+      case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
+      case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
+    }
+    shutDownLatch.countDown()
+  }
+}
+
+class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
+  private val brokers = new HashMap[Int, Broker]
+  private val messageChannels = new HashMap[Int, BlockingChannel]
+  private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
+  private val messageThreads = new HashMap[Int, RequestSendThread]
+  for(broker <- allBrokers){
+    brokers.put(broker.id, broker)
+    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+    val channel = new BlockingChannel(broker.host, broker.port,
+                                      BlockingChannel.UseDefaultBufferSize,
+                                      BlockingChannel.UseDefaultBufferSize,
+                                      config.controllerSocketTimeoutMs)
+    channel.connect()
+    messageChannels.put(broker.id, channel)
+    messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+  }
+
+  def startUp() = {
+    for((brokerId, broker) <- brokers){
+      val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
+      thread.setDaemon(false)
+      thread.start()
+      messageThreads.put(broker.id, thread)
+    }
+  }
+
+  def shutDown() = {
+    for((brokerId, broker) <- brokers){
+      removeBroker(brokerId)
+    }
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){
+    messageQueues(brokerId).put((request, callback))
+  }
+
+  def addBroker(broker: Broker){
+    brokers.put(broker.id, broker)
+    messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+    val channel = new BlockingChannel(broker.host, broker.port,
+                                      BlockingChannel.UseDefaultBufferSize,
+                                      BlockingChannel.UseDefaultBufferSize,
+                                      config.controllerSocketTimeoutMs)
+    channel.connect()
+    messageChannels.put(broker.id, channel)
+    val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
+    thread.setDaemon(false)
+    thread.start()
+    messageThreads.put(broker.id, thread)
+  }
+
+  def removeBroker(brokerId: Int){
+    brokers.remove(brokerId)
+    messageChannels(brokerId).disconnect()
+    messageChannels.remove(brokerId)
+    messageQueues.remove(brokerId)
+    messageThreads(brokerId).shutDown()
+    messageThreads.remove(brokerId)
+  }
+}
+
+class KafkaController(config : KafkaConfig) extends Logging {
+  info("controller startup");
+  private val lock = new Object
+
+  private var zkClient: ZkClient = null
+  private var controllerChannelManager: ControllerChannelManager = null
+  private var allBrokers : Set[Broker] = null
+  private var allTopics: Set[String] = null
+
+  private def tryToBecomeController() = {
+    lock synchronized {
+      val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+      if (curController == null){
+        try {
+          ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
+
+          // Only the broker successfully registering as the controller can execute following code, otherwise
+          // some exception will be thrown.
+          registerBrokerChangeListener()
+          registerTopicChangeListener()
+          allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+          allTopics = ZkUtils.getAllTopics(zkClient).toSet
+          controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+          controllerChannelManager.startUp()
+        } catch {
+          case e: ZkNodeExistsException =>
+            registerControllerExistListener()
+            info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
+          case e2 => throw e2
+        }
+      }
+      else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
+    }
+  }
+
+  def isActive(): Boolean = {
+    controllerChannelManager != null
+  }
+
+  def startup() = {
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    registerSessionExpirationListener()
+    registerControllerExistListener()
+    tryToBecomeController()
+  }
+
+  def shutDown() = {
+    if(controllerChannelManager != null){
+      controllerChannelManager.shutDown()
+    }
+    zkClient.close()
+  }
+
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
+    controllerChannelManager.sendRequest(brokerId, request, callback)
+  }
+
+  private def registerBrokerChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, new BrokerChangeListener())
+  }
+
+  private def registerTopicChangeListener() = {
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, new TopicChangeListener())
+  }
+
+  private def registerSessionExpirationListener() = {
+    zkClient.subscribeStateChanges(new SessionExpireListener())
+  }
+
+  private def registerControllerExistListener(){
+    zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
+  }
+
+  class SessionExpireListener() extends IZkStateListener {
+    @throws(classOf[Exception])
+    def handleStateChanged(state: KeeperState) {
+      // do nothing, since zkclient will do reconnect for us.
+    }
+
+    /**
+     * Called after the zookeeper session has expired and a new session has been created. You would have to re-create
+     * any ephemeral nodes here.
+     *
+     * @throws Exception
+     *             On any error.
+     */
+    @throws(classOf[Exception])
+    def handleNewSession() {
+      info("Controller session expires, clean up the state, current controller: " + config.brokerId)
+      controllerChannelManager.shutDown()
+      controllerChannelManager = null
+      info("Controller session expires, the channel manager shut downr: " + config.brokerId)
+      tryToBecomeController()
+    }
+  }
+
+  class BrokerChangeListener() extends IZkChildListener with Logging {
+    def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
+      import scala.collection.JavaConversions._
+      lock synchronized {
+        info("Broker change listener at controller triggerred")
+        val allBrokerIds = allBrokers.map(_.id)
+        val curChildrenSeq: Seq[String] = javaCurChildren
+        val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
+        val curBrokerIds = curBrokerIdsSeq.toSet
+        val addedBrokerIds = curBrokerIds -- allBrokerIds
+        val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
+        info("Added brokers: " + addedBrokerIds.toString())
+        val deletedBrokerIds = allBrokerIds -- curBrokerIds
+        info("Deleted brokers: " + deletedBrokerIds.toString())
+
+        allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
+
+        for(broker <- addedBrokersSeq){
+          controllerChannelManager.addBroker(broker)
+        }
+        for (brokerId <- deletedBrokerIds){
+          controllerChannelManager.removeBroker(brokerId)
+        }
+        /** TODO: add other broker change handler logic**/
+      }
+    }
+  }
+
+  class TopicChangeListener extends IZkChildListener with Logging {
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      // TODO: Incomplete, do not need to review this time
+    }
+  }
+
+  class ControllerExistListener extends IZkDataListener with Logging {
+    @throws(classOf[Exception])
+    def handleDataChange(dataPath: String, data: Object) {
+      // do nothing, since No logic is needed here
+    }
+
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+      info("Controller fail over, broker " + config.brokerId + " try to become controller")
+      tryToBecomeController()
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Jun 14 15:17:35 2012
@@ -42,13 +42,14 @@ class KafkaServer(val config: KafkaConfi
   var kafkaZookeeper: KafkaZooKeeper = null
   private var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
+  var kafkaController: KafkaController = new KafkaController(config)
 
   /**
    * Start up API for bringing up a single instance of the Kafka server.
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("Starting Kafka server...")
+    info("Starting Kafka server..." + config.brokerId)
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
@@ -89,6 +90,8 @@ class KafkaServer(val config: KafkaConfi
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup
 
+    kafkaController.startup()
+
     info("Server started.")
   }
   
@@ -110,13 +113,16 @@ class KafkaServer(val config: KafkaConfi
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.close()
+      if(kafkaController != null)
+        kafkaController.shutDown()
+
       kafkaZookeeper.close
 
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
       shutdownLatch.countDown()
-      info("Kafka server shut down completed")
+      info("Kafka server with id %d shut down completed".format(config.brokerId))
     }
   }
   

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Thu Jun 14 15:17:35 2012
@@ -32,6 +32,7 @@ object ZkUtils extends Logging {
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
   val BrokerStatePath = "/brokers/state"
+  val ControllerPath = "/controller"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -41,10 +42,19 @@ object ZkUtils extends Logging {
     getTopicPath(topic) + "/partitions"
   }
 
+  def getController(zkClient: ZkClient): Int= {
+    val controller = readDataMaybeNull(zkClient, ControllerPath)
+    controller.toInt
+  }
+
   def getTopicPartitionPath(topic: String, partitionId: String): String ={
     getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
+  def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
+  }
+
   def getTopicVersion(zkClient: ZkClient, topic: String): String ={
     readDataMaybeNull(zkClient, getTopicPath(topic))
   }

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1350291&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Thu Jun 14 15:17:35 2012
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.controller
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.utils.TestUtils._
+import junit.framework.Assert._
+import kafka.server.{KafkaServer, KafkaConfig}
+import kafka.api._
+import java.util.concurrent.CountDownLatch
+import java.util.concurrent.atomic.AtomicInteger
+import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
+
+
+class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness  {
+  val props = createBrokerConfigs(4)
+  val configs = props.map(p => new KafkaConfig(p))
+  var brokers: Seq[KafkaServer] = null
+
+  override def setUp() {
+    super.setUp()
+    brokers = configs.map(config => TestUtils.createServer(config))
+  }
+
+  override def tearDown() {
+    super.tearDown()
+    brokers.foreach(_.shutdown())
+  }
+
+  def testControllerFailOver(){
+    brokers(0).shutdown()
+    brokers(1).shutdown()
+    brokers(3).shutdown()
+    Thread.sleep(1000)
+
+    var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+    assertEquals(curController, "2")
+
+    brokers(1).startup()
+    brokers(2).shutdown()
+    Thread.sleep(1000)
+    curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+    assertEquals(curController, "1")
+  }
+
+  def testControllerCommandSend(){
+    Thread.sleep(1000)
+    for(broker <- brokers){
+      if(broker.kafkaController.isActive){
+        val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
+        val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
+
+        val successCount: AtomicInteger = new AtomicInteger(0)
+        val countDownLatch: CountDownLatch = new CountDownLatch(8)
+
+        def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
+          val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
+          if(response.equals(expectedResponse))
+            successCount.addAndGet(1)
+          countDownLatch.countDown()
+        }
+
+        def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
+          val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
+          if(response.equals(expectedResponse))
+            successCount.addAndGet(1)
+          countDownLatch.countDown()
+        }
+
+        broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        countDownLatch.await()
+
+        assertEquals(successCount.get(), 8)
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1350291&r1=1350290&r2=1350291&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Jun 14 15:17:35 2012
@@ -31,10 +31,13 @@ import org.I0Itec.zkclient.ZkClient
 import kafka.cluster.Broker
 import collection.mutable.ListBuffer
 import kafka.consumer.ConsumerConfig
-import kafka.api.{TopicData, PartitionData}
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.TimeUnit
 import kafka.serializer.{DefaultEncoder, Encoder}
+import kafka.common.ErrorMapping
+import kafka.api._
+import collection.mutable.{Map, Set}
+
 
 /**
  * Utility functions to help with testing
@@ -396,6 +399,52 @@ object TestUtils extends Logging {
 
 }
 
+object ControllerTestUtils{
+  def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+
+    val leader1 = 1;
+    val ISR1 = List(1, 2, 3)
+
+    val leader2 = 2;
+    val ISR2 = List(2, 3, 4)
+
+    val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
+    val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
+    val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
+                  ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
+    new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+  }
+
+  def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+    new LeaderAndISRResponse(1, responseMap)
+  }
+
+
+  def createSampleStopReplicaRequest() : StopReplicaRequest = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
+                                                    (topic2, 1), (topic2, 2)))
+  }
+
+  def createSampleStopReplicaResponse() : StopReplicaResponse = {
+    val topic1 = "test1"
+    val topic2 = "test2"
+    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
+                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+    new StopReplicaResponse(1, responseMap)
+  }
+}
+
+
+
+
 object TestZKUtils {
   val zookeeperConnect = "127.0.0.1:2182"  
 }