You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/24 01:33:00 UTC

svn commit: r1329509 - in /incubator/kafka/branches/0.8: bin/ core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/producer/ core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/server/ core/src/ma...

Author: nehanarkhede
Date: Mon Apr 23 23:32:59 2012
New Revision: 1329509

URL: http://svn.apache.org/viewvc?rev=1329509&view=rev
Log:
KAFKA-301 Implement broker startup procedure; patched by Neha Narkhede; reviewed by Jun Rao and Jay Kreps

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/QueueFullException.scala
Modified:
    incubator/kafka/branches/0.8/bin/run-rat.sh   (props changed)
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala

Propchange: incubator/kafka/branches/0.8/bin/run-rat.sh
------------------------------------------------------------------------------
    svn:executable = *

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Apr 23 23:32:59 2012
@@ -27,6 +27,7 @@ import collection.mutable.HashMap
 
 object AdminUtils extends Logging {
   val rand = new Random
+  val AdminEpoch = -1
 
   /**
    * There are 2 goals of replica assignment:
@@ -69,7 +70,6 @@ object AdminUtils extends Logging {
         replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
       ret(i) = replicaList.reverse
     }
-
     ret
   }
 
@@ -102,14 +102,14 @@ object AdminUtils extends Logging {
 
         for (i <-0 until partitionMetadata.size) {
           val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
-          val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
           val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
           partitionMetadata(i) = new PartitionMetadata(partitions(i),
             leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
             getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
             None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
         }
         Some(new TopicMetadata(topic, partitionMetadata))
@@ -117,7 +117,6 @@ object AdminUtils extends Logging {
         None
       }
     }
-
     metadataList.toList
   }
 

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a get epoch request is made for partition, but no epoch exists for that partition
+ */
+class NoEpochForPartitionException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,23 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Indicates the queue for sending messages is full of unsent messages */
+class QueueFullException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Mon Apr 23 23:32:59 2012
@@ -18,11 +18,11 @@ package kafka.producer
 
 import async._
 import kafka.utils._
-import kafka.common.InvalidConfigException
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
 import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
 import org.I0Itec.zkclient.ZkClient
+import kafka.common.{QueueFullException, InvalidConfigException}
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
@@ -120,6 +120,7 @@ extends Logging {
   def close() = {
     val canShutdown = hasShutdown.compareAndSet(false, true)
     if(canShutdown) {
+      info("Shutting down producer")
       if (producerSendThread != null)
         producerSendThread.shutdown
       eventHandler.close

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Apr 23 23:32:59 2012
@@ -105,4 +105,7 @@ class KafkaConfig(props: Properties) ext
   * leader election on all replicas minus the preferred replica */
   val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
 
+  /* size of the state change request queue in Zookeeper */
+  val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
+
  }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Apr 23 23:32:59 2012
@@ -23,6 +23,8 @@ import java.net.InetAddress
 import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
 import kafka.cluster.Replica
 import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.{Thread, IllegalStateException}
 
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -36,11 +38,10 @@ class KafkaZooKeeper(config: KafkaConfig
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
   private var zkClient: ZkClient = null
-  var topics: List[String] = Nil
-  val lock = new Object()
-  var existingTopics: Set[String] = Set.empty[String]
-  val leaderChangeListener = new LeaderChangeListener
-  val topicPartitionsChangeListener = new TopicChangeListener
+  private val leaderChangeListener = new LeaderChangeListener
+  private val topicPartitionsChangeListener = new TopicChangeListener
+  private var stateChangeHandler: StateChangeCommandHandler = null
+
   private val topicListenerLock = new Object
   private val leaderChangeLock = new Object
 
@@ -48,6 +49,7 @@ class KafkaZooKeeper(config: KafkaConfig
     /* start client */
     info("connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    startStateChangeCommandHandler()
     zkClient.subscribeStateChanges(new SessionExpireListener)
     registerBrokerInZk()
     subscribeToTopicAndPartitionsChanges(true)
@@ -60,6 +62,13 @@ class KafkaZooKeeper(config: KafkaConfig
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
+  private def startStateChangeCommandHandler() {
+    val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
+    stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
+      ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
+    stateChangeHandler.start()
+  }
+
   /**
    *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
    *  connection for us. We need to re-register this broker in the broker registry.
@@ -93,6 +102,7 @@ class KafkaZooKeeper(config: KafkaConfig
 
   def close() {
     if (zkClient != null) {
+      stateChangeHandler.shutdown()
       info("Closing zookeeper client...")
       zkClient.close()
     }
@@ -184,7 +194,6 @@ class KafkaZooKeeper(config: KafkaConfig
       case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
       case None => // leader election
         leaderElection(replica)
-
     }
   }
 
@@ -201,9 +210,12 @@ class KafkaZooKeeper(config: KafkaConfig
       }catch {
         case e => // ignoring
       }
-      if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) {
-        info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
-        // TODO: Become leader as part of KAFKA-302
+      val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
+      newLeaderEpoch match {
+        case Some(epoch) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
+          // TODO: Become leader as part of KAFKA-302
+        case None =>
       }
     }
   }
@@ -233,6 +245,26 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
+  private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
+    // check if this broker hosts a replica for this topic and partition
+    ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
+  }
+
+  private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
+    // get the topic and partition that this request is meant for
+    val topic = stateChangeCommand.topic
+    val partition = stateChangeCommand.partition
+    val epoch = stateChangeCommand.epoch
+
+    val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+    // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
+    val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
+    if(epoch > currentLeaderEpoch)
+      throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+        "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+    validEpoch
+  }
+
   class LeaderChangeListener extends IZkDataListener with Logging {
 
     @throws(classOf[Exception])

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import util.parsing.json.JSON
+import java.lang.IllegalStateException
+import kafka.utils.{Utils, Logging}
+import collection.mutable.HashMap
+
+object StateChangeCommand extends Logging {
+  val State = "state"
+  val Topic = "topic"
+  val Partition = "partition"
+  val Epoch = "epoch"
+  val StartReplica = "start-replica"
+  val CloseReplica = "close-replica"
+
+  def getStateChangeRequest(requestJson: String): StateChangeCommand = {
+    var topMap : Map[String, String] = null
+    try {
+      JSON.parseFull(requestJson) match {
+        case Some(m) =>
+          topMap = m.asInstanceOf[Map[String, String]]
+          val topic = topMap.get(StateChangeCommand.Topic).getOrElse(null)
+          val partition = topMap.get(StateChangeCommand.Partition).getOrElse("-1").toInt
+          val epoch = topMap.get(StateChangeCommand.Epoch).getOrElse("-1").toInt
+          val requestOpt = topMap.get(StateChangeCommand.State)
+          requestOpt match {
+            case Some(request) =>
+              request match {
+                case StartReplica => new StartReplica(topic, partition, epoch)
+                case CloseReplica => new CloseReplica(topic, partition, epoch)
+                case _ => throw new IllegalStateException("Unknown state change request " + request)
+              }
+            case None =>
+              throw new IllegalStateException("Illegal state change request JSON " + requestJson)
+          }
+        case None => throw new RuntimeException("Error parsing state change request : " + requestJson)
+      }
+    } catch {
+      case e =>
+        error("Error parsing state change request JSON " + requestJson, e)
+        throw e
+    }
+  }
+}
+
+sealed trait StateChangeCommand extends Logging {
+  def state: String
+
+  def topic: String
+
+  def partition: Int
+
+  def epoch: Int
+
+  def toJson(): String = {
+    val jsonMap = new HashMap[String, String]
+    jsonMap.put(StateChangeCommand.State, state)
+    jsonMap.put(StateChangeCommand.Topic, topic)
+    jsonMap.put(StateChangeCommand.Partition, partition.toString)
+    jsonMap.put(StateChangeCommand.Epoch, epoch.toString)
+    Utils.stringMapToJsonString(jsonMap)
+  }
+}
+
+/* The elected leader sends the start replica state change request to all the new replicas that have been assigned
+* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */
+case class StartReplica(val topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
+  val state: String = StateChangeCommand.StartReplica
+}
+
+/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition
+*  OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */
+case class CloseReplica(topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
+  val state: String = StateChangeCommand.CloseReplica
+}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import kafka.utils.{ZkQueue, Logging}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+
+class StateChangeCommandHandler(name: String, config: KafkaConfig, stateChangeQ: ZkQueue,
+                                ensureStateChangeCommandValidityOnThisBroker: (StateChangeCommand) => Boolean,
+                                ensureEpochValidity: (StateChangeCommand) => Boolean) extends Thread(name) with Logging {
+  val isRunning: AtomicBoolean = new AtomicBoolean(true)
+  private val shutdownLatch = new CountDownLatch(1)
+
+  override def run() {
+    try {
+      while(isRunning.get()) {
+        // get outstanding state change requests for this broker
+        val command = stateChangeQ.take()
+        val stateChangeCommand = StateChangeCommand.getStateChangeRequest(command._2)
+        ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand)
+
+        stateChangeCommand match {
+          case StartReplica(topic, partition, epoch) =>
+            if(ensureEpochValidity(stateChangeCommand))
+              handleStartReplica(topic, partition)
+          case CloseReplica(topic, partition, epoch) =>
+            /**
+             * close replica requests are sent as part of delete topic or partition reassignment process
+             * To ensure that a topic will be deleted even if the broker is offline, this state change should not
+             * be protected with the epoch validity check
+             */
+            handleCloseReplica(topic, partition)
+        }
+        stateChangeQ.remove(command)
+      }
+    }catch {
+      case e: InterruptedException => info("State change command handler interrupted. Shutting down")
+      case e1 => error("Error in state change command handler. Shutting down due to ", e1)
+    }
+    shutdownComplete()
+  }
+
+  private def shutdownComplete() = shutdownLatch.countDown
+
+  def shutdown() {
+    isRunning.set(false)
+    interrupt()
+    shutdownLatch.await()
+    info("State change command handler shutdown completed")
+  }
+
+  def handleStartReplica(topic: String, partition: Int) {
+    info("Received start replica state change command for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+    // TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused
+  }
+
+  def handleCloseReplica(topic: String, partition: Int) {
+    info("Received close replica state change command for topic %s partition %d on broker %d"
+      .format(topic, partition, config.brokerId))
+    // TODO: implement this as part of delete topic support. Until then, it is unused
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Apr 23 23:32:59 2012
@@ -700,6 +700,21 @@ object Utils extends Logging {
       case _ => // swallow
     }
   }
+
+  def stringMapToJsonString(jsonDataMap: Map[String, String]): String = {
+    val builder = new StringBuilder
+    builder.append("{ ")
+    var numElements = 0
+    for ( (key, value) <- jsonDataMap) {
+      if (numElements > 0)
+        builder.append(",")
+      builder.append("\"" + key + "\": ")
+      builder.append("\"" + value + "\"")
+      numElements += 1
+    }
+    builder.append(" }")
+    builder.toString
+  }
 }
 
 class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.utils
+
+import kafka.utils.ZkUtils._
+import kafka.common.QueueFullException
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import java.util.concurrent.PriorityBlockingQueue
+import java.util.Comparator
+
+class ZkQueue(zkClient: ZkClient, path: String, size: Int) {
+  // create the queue in ZK, if one does not exist
+  makeSurePersistentPathExists(zkClient, path)
+  val queueItems = new PriorityBlockingQueue[String](size, new ZkQueueComparator)
+  var latestQueueItemPriority: Int = -1
+  zkClient.subscribeChildChanges(path, new ZkQueueListener)
+
+  // TODO: This API will be used by the leader to enqueue state change requests to the followers
+  /**
+   * Inserts the specified element into this priority queue. This method will never block. If the queue is full,
+   * it will throw QueueFullException
+   * @param item Item to add to the zookeeper queue
+   * @returns The zookeeper location of item in the queue
+   */
+  def put(item: String): String = {
+    // if queue is full, throw QueueFullException
+    if(isFull)
+      throw new QueueFullException("Queue is full. Item %s will be rejected".format(item))
+    val queueLocation = createSequentialPersistentPath(zkClient, path + "/", item)
+    debug("Added item %s to queue at location %s".format(item, queueLocation))
+    queueLocation
+  }
+
+  /**
+   * Reads all the items and their queue locations in this queue
+   * @returns A list of (queue_location, item) pairs
+   */
+  def readAll(): Seq[(String, String)] = {
+    val allItems = getChildren(zkClient, path).sorted
+    allItems.size match {
+      case 0 => Seq.empty[(String, String)]
+      case _ => allItems.map { item =>
+        // read the data and delete the node
+        val queueLocation = path + "/" + item
+        val data = ZkUtils.readData(zkClient, queueLocation)
+        (item, data)
+      }
+    }
+  }
+
+  /**
+   * Returns true if this zookeeper queue contains no elements.
+   */
+  def isEmpty: Boolean = (readAll().size == 0)
+
+  // TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic
+  /**
+   * Returns true if this zookeeper queue contains number of items equal to the size of the queue
+   */
+  def isFull: Boolean = (readAll().size == size)
+
+  /**
+   * Retrieves but does not remove the head of this queue, waiting if necessary until an element becomes available.
+   * @returns The location of the head and the head element in the zookeeper queue
+   */
+  def take(): (String, String) = {
+    // take the element key
+    val item = queueItems.take()
+    val queueLocation = path + "/" + item
+    val data = ZkUtils.readData(zkClient, queueLocation)
+    (item, data)
+  }
+
+  /**
+   * Removes a single instance of the specified element from this queue, if it is present. More formally, removes an
+   * element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue
+   * contained the specified element (or equivalently, if this queue changed as a result of the call).
+   * @param queueItem A tuple where the first element is the location of the item as returned by the take() API and the
+   * second element is the queue item to be removed
+   */
+  def remove(queueItem: (String, String)): Boolean = {
+    val queueLocation = path + "/" + queueItem._1
+    // we do not want to remove items from the queue if they were not read
+    assert(!queueItems.contains(queueItem._1), "Attempt to remove unconsumed item %s from the queue".format(queueItem))
+    ZkUtils.deletePath(zkClient, queueLocation)
+  }
+
+  class ZkQueueListener extends IZkChildListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      debug("ZkQueue listener fired for queue %s with children %s and latest queue item priority %d"
+        .format(path, curChilds.toString, latestQueueItemPriority))
+      import scala.collection.JavaConversions._
+      val outstandingRequests = asBuffer(curChilds).sortWith((req1, req2) => req1.toInt < req2.toInt)
+      outstandingRequests.foreach { req =>
+        val queueItemPriority = req.toInt
+        if(queueItemPriority > latestQueueItemPriority) {
+          latestQueueItemPriority = queueItemPriority
+          queueItems.add(req)
+          debug("Added item %s to queue %s".format(req, path))
+        }
+      }
+    }
+  }
+
+  class ZkQueueComparator extends Comparator[String] {
+    def compare(element1: String, element2: String): Int = {
+      element1.toInt - element2.toInt
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Apr 23 23:32:59 2012
@@ -25,11 +25,13 @@ import org.I0Itec.zkclient.exception.{Zk
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import java.util.concurrent.locks.Condition
+import kafka.common.NoEpochForPartitionException
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
+  val BrokerStatePath = "/brokers/state"
 
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
@@ -59,6 +61,10 @@ object ZkUtils extends Logging {
     getTopicPartitionPath(topic, partitionId) + "/" + "leader"
   }
 
+  def getBrokerStateChangePath(brokerId: Int): String = {
+    BrokerStatePath + "/" + brokerId
+  }
+
   def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
       ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
@@ -69,9 +75,37 @@ object ZkUtils extends Logging {
   }
 
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
-    if(leader == null) None
-    else Some(leader.toInt)
+    val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+    if(leaderAndEpoch == null) None
+    else {
+      val leaderAndEpochInfo = leaderAndEpoch.split(";")
+      Some(leaderAndEpochInfo.head.toInt)
+    }
+  }
+
+  /**
+   * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
+   * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
+   * other broker will retry becoming leader with the same new epoch value.
+   */
+  def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+    val lastKnownEpoch = try {
+      val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
+      if(isrAndEpoch != null) {
+        val isrAndEpochInfo = isrAndEpoch.split(";")
+        if(isrAndEpochInfo.last.isEmpty)
+          throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
+        else
+          isrAndEpochInfo.last.toInt
+      }else {
+        throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
+      }
+    }catch {
+      case e: ZkNoNodeException =>
+        throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
+      case e1 => throw e1
+    }
+    lastKnownEpoch
   }
 
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
@@ -83,22 +117,60 @@ object ZkUtils extends Logging {
     }
   }
 
+  def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
+    val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
+    if(replicaListAndEpochString == null)
+      Seq.empty[Int]
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
+    }
+  }
+
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
     val replicas = getReplicasForPartition(zkClient, topic, partition)
     debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
     try {
-      createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
-      true
+      // NOTE: first increment epoch, then become leader
+      val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
+      createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
+        "%d;%d".format(brokerId, newEpoch))
+      val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
+        "%s;%d".format(currentISR.mkString(","), newEpoch))
+      info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
+      Some(newEpoch)
     } catch {
-      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
-      case oe => false
+      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
+      case oe => None
     }
   }
 
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+    // read previous epoch, increment it and write it to the leader path and the ISR path.
+    val epoch = try {
+      Some(getEpochForPartition(client, topic, partition))
+    }catch {
+      case e: NoEpochForPartitionException => None
+      case e1 => throw e1
+    }
+
+    val newEpoch = epoch match {
+      case Some(partitionEpoch) =>
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+        partitionEpoch + 1
+      case None =>
+        // this is the first time leader is elected for this partition. So set epoch to 1
+        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
+        1
+    }
+    newEpoch
+  }
+
   def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
     val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
     val broker = new Broker(id, creator, host, port)
@@ -186,7 +258,7 @@ object ZkUtils extends Logging {
   /**
    * Create an persistent node with the given path and data. Create parents if necessary.
    */
-  def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+  def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
     try {
       client.createPersistent(path, data)
     }
@@ -198,6 +270,10 @@ object ZkUtils extends Logging {
     }
   }
 
+  def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
+    client.createPersistentSequential(path, data)
+  }
+
   /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
@@ -238,7 +314,7 @@ object ZkUtils extends Logging {
     }
   }
 
-  def deletePath(client: ZkClient, path: String) {
+  def deletePath(client: ZkClient, path: String): Boolean = {
     try {
       client.delete(path)
     }
@@ -246,6 +322,7 @@ object ZkUtils extends Logging {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
+        false
       case e2 => throw e2
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Apr 23 23:32:59 2012
@@ -24,7 +24,6 @@ import org.easymock.EasyMock
 import org.junit.Test
 import kafka.api._
 import kafka.cluster.Broker
-import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
 import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
 import kafka.producer.async._
 import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
@@ -35,6 +34,7 @@ import collection.Map
 import collection.mutable.ListBuffer
 import org.scalatest.junit.JUnit3Suite
 import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException}
 
 class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -56,7 +56,7 @@ class AsyncProducerTest extends JUnit3Su
     val mockEventHandler = new EventHandler[String,String] {
 
       def handle(events: Seq[ProducerData[String,String]]) {
-        Thread.sleep(1000000)
+        Thread.sleep(500)
       }
 
       def close {}
@@ -79,6 +79,8 @@ class AsyncProducerTest extends JUnit3Su
     }
     catch {
       case e: QueueFullException => //expected
+    }finally {
+      producer.close()
     }
   }
 
@@ -319,6 +321,8 @@ class AsyncProducerTest extends JUnit3Su
       fail("Should fail with ClassCastException due to incompatible Encoder")
     } catch {
       case e: ClassCastException =>
+    }finally {
+      producer.close()
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Mon Apr 23 23:32:59 2012
@@ -22,7 +22,7 @@ import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import junit.framework.Assert._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{ZkUtils, Utils, TestUtils}
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
 
@@ -35,27 +35,22 @@ class LeaderElectionTest extends JUnit3S
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
 
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   override def setUp() {
     super.setUp()
-
-    // start both servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-
-    servers ++= List(server1, server2)
   }
 
   override def tearDown() {
-    // shutdown the servers and delete data hosted on them
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
-
     super.tearDown()
   }
 
   def testLeaderElectionWithCreateTopic {
+    var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+    // start both servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+    servers ++= List(server1, server2)
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -64,15 +59,16 @@ class LeaderElectionTest extends JUnit3S
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-
-    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+    assertTrue("Leader should get elected", leader.isDefined)
+    // NOTE: this is to avoid transient test failures
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
     // kill the server hosting the preferred replica
     servers.head.shutdown()
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     Thread.sleep(zookeeper.tickTime)
@@ -81,7 +77,6 @@ class LeaderElectionTest extends JUnit3S
     servers.head.startup()
 
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-    // TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0
     assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
 
     // shutdown current leader (broker 1)
@@ -90,5 +85,41 @@ class LeaderElectionTest extends JUnit3S
 
     // test if the leader is the preferred replica
     assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  // Assuming leader election happens correctly, test if epoch changes as expected
+  def testEpoch() {
+    // keep switching leaders to see if epoch changes correctly
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // setup 2 brokers in ZK
+    val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
+
+    try {
+      // create topic with 1 partition, 2 replicas, one on each broker
+      CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+      var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+      assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+      assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+      assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+      assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+      newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+      assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+      assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+
+    }finally {
+      TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
+    }
+
   }
 }
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.common.QueueFullException
+import junit.framework.Assert._
+import kafka.utils.{ZkQueue, TestUtils}
+
+class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+  val brokerId1 = 0
+  val port1 = TestUtils.choosePort()
+  var stateChangeQ: ZkQueue = null
+  val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1))
+
+  override def setUp() {
+    super.setUp()
+
+    // create a queue
+    val queuePath = "/brokers/state/" + config.brokerId
+    stateChangeQ = new ZkQueue(zkClient, queuePath, 10)
+  }
+
+  override def tearDown() {
+    super.tearDown()
+  }
+
+  def testZkQueueDrainAll() {
+    for(i <- 0 until 5) {
+      val itemPath = stateChangeQ.put("test:0:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i, item)
+    }
+
+    var numItems: Int = 0
+    for(i <- 0 until 5) {
+      val item = stateChangeQ.take()
+      assertEquals("test:0:follower", item._2)
+      assertTrue(stateChangeQ.remove(item))
+      numItems += 1
+    }
+    assertEquals(5, numItems)
+
+    for(i <- 5 until 10) {
+      val itemPath = stateChangeQ.put("test:1:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i+5, item)
+    }
+
+    numItems = 0
+    for(i <- 0 until 5) {
+      val item = stateChangeQ.take()
+      assertTrue(stateChangeQ.remove(item))
+      assertEquals("test:1:follower", item._2)
+      numItems += 1
+    }
+    assertEquals(5, numItems)
+  }
+
+  def testZkQueueFull() {
+    for(i <- 0 until 10) {
+      val itemPath = stateChangeQ.put("test:0:follower")
+      val item = itemPath.split("/").last.split("-").last.toInt
+      assertEquals(i, item)
+    }
+
+    try {
+      stateChangeQ.put("test:0:follower")
+      fail("Queue should be full")
+    }catch {
+      case e:QueueFullException => // expected
+    }
+  }
+
+  def testStateChangeCommandJson() {
+    // test start replica
+    val topic = "foo"
+    val partition = 0
+    val epoch = 1
+
+    val startReplica = new StartReplica(topic, partition, epoch)
+    val startReplicaJson = startReplica.toJson()
+    val startReplicaFromJson = StateChangeCommand.getStateChangeRequest(startReplicaJson)
+    assertEquals(startReplica, startReplicaFromJson)
+
+    // test close replica
+    val closeReplica = new StartReplica(topic, partition, epoch)
+    val closeReplicaJson = startReplica.toJson()
+    val closeReplicaFromJson = StateChangeCommand.getStateChangeRequest(closeReplicaJson)
+    assertEquals(closeReplica, closeReplicaFromJson)
+  }
+
+  // TODO: Do this after patch for delete topic/delete partition is in
+  def testStateChangeRequestValidity() {
+    // mock out the StateChangeRequestHandler
+
+    // setup 3 replicas for one topic partition
+
+    // shutdown follower 1
+
+    // restart leader to trigger epoch change
+
+    // start follower 1
+
+    // test follower 1 acted only on one become follower request
+  }
+}
\ No newline at end of file