You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/04/24 01:33:00 UTC
svn commit: r1329509 - in /incubator/kafka/branches/0.8: bin/
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/producer/
core/src/main/scala/kafka/producer/async/ core/src/main/scala/kafka/server/
core/src/ma...
Author: nehanarkhede
Date: Mon Apr 23 23:32:59 2012
New Revision: 1329509
URL: http://svn.apache.org/viewvc?rev=1329509&view=rev
Log:
KAFKA-301 Implement broker startup procedure; patched by Neha Narkhede; reviewed by Jun Rao and Jay Kreps
Added:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/QueueFullException.scala
Modified:
incubator/kafka/branches/0.8/bin/run-rat.sh (props changed)
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
Propchange: incubator/kafka/branches/0.8/bin/run-rat.sh
------------------------------------------------------------------------------
svn:executable = *
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Apr 23 23:32:59 2012
@@ -27,6 +27,7 @@ import collection.mutable.HashMap
object AdminUtils extends Logging {
val rand = new Random
+ val AdminEpoch = -1
/**
* There are 2 goals of replica assignment:
@@ -69,7 +70,6 @@ object AdminUtils extends Logging {
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
ret(i) = replicaList.reverse
}
-
ret
}
@@ -102,14 +102,14 @@ object AdminUtils extends Logging {
for (i <-0 until partitionMetadata.size) {
val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
- val inSyncReplicas = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.getTopicPartitionInSyncPath(topic, partitions(i).toString))
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
partitionMetadata(i) = new PartitionMetadata(partitions(i),
leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
- getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(inSyncReplicas).map(id => id.toInt)),
+ getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
}
Some(new TopicMetadata(topic, partitionMetadata))
@@ -117,7 +117,6 @@ object AdminUtils extends Logging {
None
}
}
-
metadataList.toList
}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/NoEpochForPartitionException.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Thrown when a get epoch request is made for partition, but no epoch exists for that partition
+ */
+class NoEpochForPartitionException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/QueueFullException.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,23 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* Indicates the queue for sending messages is full of unsent messages */
+class QueueFullException(message: String) extends RuntimeException(message) {
+ def this() = this(null)
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Mon Apr 23 23:32:59 2012
@@ -18,11 +18,11 @@ package kafka.producer
import async._
import kafka.utils._
-import kafka.common.InvalidConfigException
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
import org.I0Itec.zkclient.ZkClient
+import kafka.common.{QueueFullException, InvalidConfigException}
class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only
@@ -120,6 +120,7 @@ extends Logging {
def close() = {
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {
+ info("Shutting down producer")
if (producerSendThread != null)
producerSendThread.shutdown
eventHandler.close
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Apr 23 23:32:59 2012
@@ -105,4 +105,7 @@ class KafkaConfig(props: Properties) ext
* leader election on all replicas minus the preferred replica */
val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
+ /* size of the state change request queue in Zookeeper */
+ val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
+
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Apr 23 23:32:59 2012
@@ -23,6 +23,8 @@ import java.net.InetAddress
import kafka.common.{InvalidPartitionException, KafkaZookeeperClient}
import kafka.cluster.Replica
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.{Thread, IllegalStateException}
/**
* Handles the server's interaction with zookeeper. The server needs to register the following paths:
@@ -36,11 +38,10 @@ class KafkaZooKeeper(config: KafkaConfig
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
private var zkClient: ZkClient = null
- var topics: List[String] = Nil
- val lock = new Object()
- var existingTopics: Set[String] = Set.empty[String]
- val leaderChangeListener = new LeaderChangeListener
- val topicPartitionsChangeListener = new TopicChangeListener
+ private val leaderChangeListener = new LeaderChangeListener
+ private val topicPartitionsChangeListener = new TopicChangeListener
+ private var stateChangeHandler: StateChangeCommandHandler = null
+
private val topicListenerLock = new Object
private val leaderChangeLock = new Object
@@ -48,6 +49,7 @@ class KafkaZooKeeper(config: KafkaConfig
/* start client */
info("connecting to ZK: " + config.zkConnect)
zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ startStateChangeCommandHandler()
zkClient.subscribeStateChanges(new SessionExpireListener)
registerBrokerInZk()
subscribeToTopicAndPartitionsChanges(true)
@@ -60,6 +62,13 @@ class KafkaZooKeeper(config: KafkaConfig
ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
}
+ private def startStateChangeCommandHandler() {
+ val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
+ stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
+ ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
+ stateChangeHandler.start()
+ }
+
/**
* When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
* connection for us. We need to re-register this broker in the broker registry.
@@ -93,6 +102,7 @@ class KafkaZooKeeper(config: KafkaConfig
def close() {
if (zkClient != null) {
+ stateChangeHandler.shutdown()
info("Closing zookeeper client...")
zkClient.close()
}
@@ -184,7 +194,6 @@ class KafkaZooKeeper(config: KafkaConfig
case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partId, leader))
case None => // leader election
leaderElection(replica)
-
}
}
@@ -201,9 +210,12 @@ class KafkaZooKeeper(config: KafkaConfig
}catch {
case e => // ignoring
}
- if(ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)) {
- info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
- // TODO: Become leader as part of KAFKA-302
+ val newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic, replica.partition.partId, replica.brokerId)
+ newLeaderEpoch match {
+ case Some(epoch) =>
+ info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic, replica.partition.partId))
+ // TODO: Become leader as part of KAFKA-302
+ case None =>
}
}
}
@@ -233,6 +245,26 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
+ private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
+ // check if this broker hosts a replica for this topic and partition
+ ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
+ }
+
+ private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
+ // get the topic and partition that this request is meant for
+ val topic = stateChangeCommand.topic
+ val partition = stateChangeCommand.partition
+ val epoch = stateChangeCommand.epoch
+
+ val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+ // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
+ val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
+ if(epoch > currentLeaderEpoch)
+ throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+ "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+ validEpoch
+ }
+
class LeaderChangeListener extends IZkDataListener with Logging {
@throws(classOf[Exception])
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import util.parsing.json.JSON
+import java.lang.IllegalStateException
+import kafka.utils.{Utils, Logging}
+import collection.mutable.HashMap
+
+object StateChangeCommand extends Logging {
+ val State = "state"
+ val Topic = "topic"
+ val Partition = "partition"
+ val Epoch = "epoch"
+ val StartReplica = "start-replica"
+ val CloseReplica = "close-replica"
+
+ def getStateChangeRequest(requestJson: String): StateChangeCommand = {
+ var topMap : Map[String, String] = null
+ try {
+ JSON.parseFull(requestJson) match {
+ case Some(m) =>
+ topMap = m.asInstanceOf[Map[String, String]]
+ val topic = topMap.get(StateChangeCommand.Topic).getOrElse(null)
+ val partition = topMap.get(StateChangeCommand.Partition).getOrElse("-1").toInt
+ val epoch = topMap.get(StateChangeCommand.Epoch).getOrElse("-1").toInt
+ val requestOpt = topMap.get(StateChangeCommand.State)
+ requestOpt match {
+ case Some(request) =>
+ request match {
+ case StartReplica => new StartReplica(topic, partition, epoch)
+ case CloseReplica => new CloseReplica(topic, partition, epoch)
+ case _ => throw new IllegalStateException("Unknown state change request " + request)
+ }
+ case None =>
+ throw new IllegalStateException("Illegal state change request JSON " + requestJson)
+ }
+ case None => throw new RuntimeException("Error parsing state change request : " + requestJson)
+ }
+ } catch {
+ case e =>
+ error("Error parsing state change request JSON " + requestJson, e)
+ throw e
+ }
+ }
+}
+
+sealed trait StateChangeCommand extends Logging {
+ def state: String
+
+ def topic: String
+
+ def partition: Int
+
+ def epoch: Int
+
+ def toJson(): String = {
+ val jsonMap = new HashMap[String, String]
+ jsonMap.put(StateChangeCommand.State, state)
+ jsonMap.put(StateChangeCommand.Topic, topic)
+ jsonMap.put(StateChangeCommand.Partition, partition.toString)
+ jsonMap.put(StateChangeCommand.Epoch, epoch.toString)
+ Utils.stringMapToJsonString(jsonMap)
+ }
+}
+
+/* The elected leader sends the start replica state change request to all the new replicas that have been assigned
+* a partition. Note that the followers must act on this request only if the request epoch == latest partition epoch or -1 */
+case class StartReplica(val topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
+ val state: String = StateChangeCommand.StartReplica
+}
+
+/* The elected leader sends the close replica state change request to all the replicas that have been un-assigned a partition
+* OR if a topic has been deleted. Note that the followers must act on this request even if the epoch has changed */
+case class CloseReplica(topic: String, partition: Int, epoch: Int) extends StateChangeCommand {
+ val state: String = StateChangeCommand.CloseReplica
+}
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommandHandler.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import kafka.utils.{ZkQueue, Logging}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.CountDownLatch
+
+class StateChangeCommandHandler(name: String, config: KafkaConfig, stateChangeQ: ZkQueue,
+ ensureStateChangeCommandValidityOnThisBroker: (StateChangeCommand) => Boolean,
+ ensureEpochValidity: (StateChangeCommand) => Boolean) extends Thread(name) with Logging {
+ val isRunning: AtomicBoolean = new AtomicBoolean(true)
+ private val shutdownLatch = new CountDownLatch(1)
+
+ override def run() {
+ try {
+ while(isRunning.get()) {
+ // get outstanding state change requests for this broker
+ val command = stateChangeQ.take()
+ val stateChangeCommand = StateChangeCommand.getStateChangeRequest(command._2)
+ ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand)
+
+ stateChangeCommand match {
+ case StartReplica(topic, partition, epoch) =>
+ if(ensureEpochValidity(stateChangeCommand))
+ handleStartReplica(topic, partition)
+ case CloseReplica(topic, partition, epoch) =>
+ /**
+ * close replica requests are sent as part of delete topic or partition reassignment process
+ * To ensure that a topic will be deleted even if the broker is offline, this state change should not
+ * be protected with the epoch validity check
+ */
+ handleCloseReplica(topic, partition)
+ }
+ stateChangeQ.remove(command)
+ }
+ }catch {
+ case e: InterruptedException => info("State change command handler interrupted. Shutting down")
+ case e1 => error("Error in state change command handler. Shutting down due to ", e1)
+ }
+ shutdownComplete()
+ }
+
+ private def shutdownComplete() = shutdownLatch.countDown
+
+ def shutdown() {
+ isRunning.set(false)
+ interrupt()
+ shutdownLatch.await()
+ info("State change command handler shutdown completed")
+ }
+
+ def handleStartReplica(topic: String, partition: Int) {
+ info("Received start replica state change command for topic %s partition %d on broker %d"
+ .format(topic, partition, config.brokerId))
+ // TODO: implement this as part of create topic support or partition reassignment support. Until then, it is unused
+ }
+
+ def handleCloseReplica(topic: String, partition: Int) {
+ info("Received close replica state change command for topic %s partition %d on broker %d"
+ .format(topic, partition, config.brokerId))
+ // TODO: implement this as part of delete topic support. Until then, it is unused
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Apr 23 23:32:59 2012
@@ -700,6 +700,21 @@ object Utils extends Logging {
case _ => // swallow
}
}
+
+ def stringMapToJsonString(jsonDataMap: Map[String, String]): String = {
+ val builder = new StringBuilder
+ builder.append("{ ")
+ var numElements = 0
+ for ( (key, value) <- jsonDataMap) {
+ if (numElements > 0)
+ builder.append(",")
+ builder.append("\"" + key + "\": ")
+ builder.append("\"" + value + "\"")
+ numElements += 1
+ }
+ builder.append(" }")
+ builder.toString
+ }
}
class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkQueue.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.utils
+
+import kafka.utils.ZkUtils._
+import kafka.common.QueueFullException
+import org.I0Itec.zkclient.{IZkChildListener, ZkClient}
+import java.util.concurrent.PriorityBlockingQueue
+import java.util.Comparator
+
+class ZkQueue(zkClient: ZkClient, path: String, size: Int) {
+ // create the queue in ZK, if one does not exist
+ makeSurePersistentPathExists(zkClient, path)
+ val queueItems = new PriorityBlockingQueue[String](size, new ZkQueueComparator)
+ var latestQueueItemPriority: Int = -1
+ zkClient.subscribeChildChanges(path, new ZkQueueListener)
+
+ // TODO: This API will be used by the leader to enqueue state change requests to the followers
+ /**
+ * Inserts the specified element into this priority queue. This method will never block. If the queue is full,
+ * it will throw QueueFullException
+ * @param item Item to add to the zookeeper queue
+ * @returns The zookeeper location of item in the queue
+ */
+ def put(item: String): String = {
+ // if queue is full, throw QueueFullException
+ if(isFull)
+ throw new QueueFullException("Queue is full. Item %s will be rejected".format(item))
+ val queueLocation = createSequentialPersistentPath(zkClient, path + "/", item)
+ debug("Added item %s to queue at location %s".format(item, queueLocation))
+ queueLocation
+ }
+
+ /**
+ * Reads all the items and their queue locations in this queue
+ * @returns A list of (queue_location, item) pairs
+ */
+ def readAll(): Seq[(String, String)] = {
+ val allItems = getChildren(zkClient, path).sorted
+ allItems.size match {
+ case 0 => Seq.empty[(String, String)]
+ case _ => allItems.map { item =>
+ // read the data and delete the node
+ val queueLocation = path + "/" + item
+ val data = ZkUtils.readData(zkClient, queueLocation)
+ (item, data)
+ }
+ }
+ }
+
+ /**
+ * Returns true if this zookeeper queue contains no elements.
+ */
+ def isEmpty: Boolean = (readAll().size == 0)
+
+ // TODO: Implement the queue shrink operation if the queue is full, as part of create/delete topic
+ /**
+ * Returns true if this zookeeper queue contains number of items equal to the size of the queue
+ */
+ def isFull: Boolean = (readAll().size == size)
+
+ /**
+ * Retrieves but does not remove the head of this queue, waiting if necessary until an element becomes available.
+ * @returns The location of the head and the head element in the zookeeper queue
+ */
+ def take(): (String, String) = {
+ // take the element key
+ val item = queueItems.take()
+ val queueLocation = path + "/" + item
+ val data = ZkUtils.readData(zkClient, queueLocation)
+ (item, data)
+ }
+
+ /**
+ * Removes a single instance of the specified element from this queue, if it is present. More formally, removes an
+ * element e such that o.equals(e), if this queue contains one or more such elements. Returns true if this queue
+ * contained the specified element (or equivalently, if this queue changed as a result of the call).
+ * @param queueItem A tuple where the first element is the location of the item as returned by the take() API and the
+ * second element is the queue item to be removed
+ */
+ def remove(queueItem: (String, String)): Boolean = {
+ val queueLocation = path + "/" + queueItem._1
+ // we do not want to remove items from the queue if they were not read
+ assert(!queueItems.contains(queueItem._1), "Attempt to remove unconsumed item %s from the queue".format(queueItem))
+ ZkUtils.deletePath(zkClient, queueLocation)
+ }
+
+ class ZkQueueListener extends IZkChildListener with Logging {
+
+ @throws(classOf[Exception])
+ def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ debug("ZkQueue listener fired for queue %s with children %s and latest queue item priority %d"
+ .format(path, curChilds.toString, latestQueueItemPriority))
+ import scala.collection.JavaConversions._
+ val outstandingRequests = asBuffer(curChilds).sortWith((req1, req2) => req1.toInt < req2.toInt)
+ outstandingRequests.foreach { req =>
+ val queueItemPriority = req.toInt
+ if(queueItemPriority > latestQueueItemPriority) {
+ latestQueueItemPriority = queueItemPriority
+ queueItems.add(req)
+ debug("Added item %s to queue %s".format(req, path))
+ }
+ }
+ }
+ }
+
+ class ZkQueueComparator extends Comparator[String] {
+ def compare(element1: String, element2: String): Int = {
+ element1.toInt - element2.toInt
+ }
+ }
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Apr 23 23:32:59 2012
@@ -25,11 +25,13 @@ import org.I0Itec.zkclient.exception.{Zk
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import java.util.concurrent.locks.Condition
+import kafka.common.NoEpochForPartitionException
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
+ val BrokerStatePath = "/brokers/state"
def getTopicPath(topic: String): String ={
BrokerTopicsPath + "/" + topic
@@ -59,6 +61,10 @@ object ZkUtils extends Logging {
getTopicPartitionPath(topic, partitionId) + "/" + "leader"
}
+ def getBrokerStateChangePath(brokerId: Int): String = {
+ BrokerStatePath + "/" + brokerId
+ }
+
def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
}
@@ -69,9 +75,37 @@ object ZkUtils extends Logging {
}
def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
- val leader = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
- if(leader == null) None
- else Some(leader.toInt)
+ val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+ if(leaderAndEpoch == null) None
+ else {
+ val leaderAndEpochInfo = leaderAndEpoch.split(";")
+ Some(leaderAndEpochInfo.head.toInt)
+ }
+ }
+
+ /**
+ * This API should read the epoch in the ISR path. It is sufficient to read the epoch in the ISR path, since if the
+ * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
+ * other broker will retry becoming leader with the same new epoch value.
+ */
+ def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+ val lastKnownEpoch = try {
+ val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
+ if(isrAndEpoch != null) {
+ val isrAndEpochInfo = isrAndEpoch.split(";")
+ if(isrAndEpochInfo.last.isEmpty)
+ throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
+ else
+ isrAndEpochInfo.last.toInt
+ }else {
+ throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
+ }
+ }catch {
+ case e: ZkNoNodeException =>
+ throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
+ case e1 => throw e1
+ }
+ lastKnownEpoch
}
def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
@@ -83,22 +117,60 @@ object ZkUtils extends Logging {
}
}
+ def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
+ val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
+ if(replicaListAndEpochString == null)
+ Seq.empty[Int]
+ else {
+ val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+ Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
+ }
+ }
+
def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
val replicas = getReplicasForPartition(zkClient, topic, partition)
debug("The list of replicas for topic %s, partition %d is %s".format(topic, partition, replicas))
replicas.contains(brokerId.toString)
}
- def tryToBecomeLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
+ def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[Int] = {
try {
- createEphemeralPathExpectConflict(zkClient, getTopicPartitionLeaderPath(topic, partition.toString), brokerId.toString)
- true
+ // NOTE: first increment epoch, then become leader
+ val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
+ createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
+ "%d;%d".format(brokerId, newEpoch))
+ val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+ updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
+ "%s;%d".format(currentISR.mkString(","), newEpoch))
+ info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
+ Some(newEpoch)
} catch {
- case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); false
- case oe => false
+ case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
+ case oe => None
}
}
+ def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+ // read previous epoch, increment it and write it to the leader path and the ISR path.
+ val epoch = try {
+ Some(getEpochForPartition(client, topic, partition))
+ }catch {
+ case e: NoEpochForPartitionException => None
+ case e1 => throw e1
+ }
+
+ val newEpoch = epoch match {
+ case Some(partitionEpoch) =>
+ debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, epoch))
+ partitionEpoch + 1
+ case None =>
+ // this is the first time leader is elected for this partition. So set epoch to 1
+ debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
+ 1
+ }
+ newEpoch
+ }
+
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, creator: String, port: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val broker = new Broker(id, creator, host, port)
@@ -186,7 +258,7 @@ object ZkUtils extends Logging {
/**
* Create an persistent node with the given path and data. Create parents if necessary.
*/
- def createPersistentPath(client: ZkClient, path: String, data: String): Unit = {
+ def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
try {
client.createPersistent(path, data)
}
@@ -198,6 +270,10 @@ object ZkUtils extends Logging {
}
}
+ def createSequentialPersistentPath(client: ZkClient, path: String, data: String = ""): String = {
+ client.createPersistentSequential(path, data)
+ }
+
/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
@@ -238,7 +314,7 @@ object ZkUtils extends Logging {
}
}
- def deletePath(client: ZkClient, path: String) {
+ def deletePath(client: ZkClient, path: String): Boolean = {
try {
client.delete(path)
}
@@ -246,6 +322,7 @@ object ZkUtils extends Logging {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
+ false
case e2 => throw e2
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Apr 23 23:32:59 2012
@@ -24,7 +24,6 @@ import org.easymock.EasyMock
import org.junit.Test
import kafka.api._
import kafka.cluster.Broker
-import kafka.common.{InvalidConfigException, NoBrokersForPartitionException, InvalidPartitionException}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
import kafka.producer.async._
import kafka.serializer.{StringEncoder, StringDecoder, Encoder}
@@ -35,6 +34,7 @@ import collection.Map
import collection.mutable.ListBuffer
import org.scalatest.junit.JUnit3Suite
import kafka.utils.{NegativePartitioner, TestZKUtils, TestUtils}
+import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException, InvalidConfigException, QueueFullException}
class AsyncProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
@@ -56,7 +56,7 @@ class AsyncProducerTest extends JUnit3Su
val mockEventHandler = new EventHandler[String,String] {
def handle(events: Seq[ProducerData[String,String]]) {
- Thread.sleep(1000000)
+ Thread.sleep(500)
}
def close {}
@@ -79,6 +79,8 @@ class AsyncProducerTest extends JUnit3Su
}
catch {
case e: QueueFullException => //expected
+ }finally {
+ producer.close()
}
}
@@ -319,6 +321,8 @@ class AsyncProducerTest extends JUnit3Su
fail("Should fail with ClassCastException due to incompatible Encoder")
} catch {
case e: ClassCastException =>
+ }finally {
+ producer.close()
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1329509&r1=1329508&r2=1329509&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Mon Apr 23 23:32:59 2012
@@ -22,7 +22,7 @@ import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils._
import junit.framework.Assert._
-import kafka.utils.{Utils, TestUtils}
+import kafka.utils.{ZkUtils, Utils, TestUtils}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -35,27 +35,22 @@ class LeaderElectionTest extends JUnit3S
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
- var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
override def setUp() {
super.setUp()
-
- // start both servers
- val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
- val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-
- servers ++= List(server1, server2)
}
override def tearDown() {
- // shutdown the servers and delete data hosted on them
- servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
-
super.tearDown()
}
def testLeaderElectionWithCreateTopic {
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+ // start both servers
+ val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+ val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+ servers ++= List(server1, server2)
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
@@ -64,15 +59,16 @@ class LeaderElectionTest extends JUnit3S
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-
- assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 200)
+ assertTrue("Leader should get elected", leader.isDefined)
+ // NOTE: this is to avoid transient test failures
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
// kill the server hosting the preferred replica
servers.head.shutdown()
// check if leader moves to the other server
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 5000)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
Thread.sleep(zookeeper.tickTime)
@@ -81,7 +77,6 @@ class LeaderElectionTest extends JUnit3S
servers.head.startup()
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
- // TODO: Once the optimization for preferred replica re-election is in, this check should change to broker 0
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
// shutdown current leader (broker 1)
@@ -90,5 +85,41 @@ class LeaderElectionTest extends JUnit3S
// test if the leader is the preferred replica
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+ // shutdown the servers and delete data hosted on them
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ // Assuming leader election happens correctly, test if epoch changes as expected
+ def testEpoch() {
+ // keep switching leaders to see if epoch changes correctly
+ val topic = "new-topic"
+ val partitionId = 0
+
+ // setup 2 brokers in ZK
+ val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
+
+ try {
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+ var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+ assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+ assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get)
+
+ }finally {
+ TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
+ }
+
}
}
\ No newline at end of file
Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala?rev=1329509&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/StateChangeTest.scala Mon Apr 23 23:32:59 2012
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package kafka.server
+
+import org.scalatest.junit.JUnit3Suite
+import kafka.zk.ZooKeeperTestHarness
+import kafka.common.QueueFullException
+import junit.framework.Assert._
+import kafka.utils.{ZkQueue, TestUtils}
+
+class StateChangeTest extends JUnit3Suite with ZooKeeperTestHarness {
+
+ val brokerId1 = 0
+ val port1 = TestUtils.choosePort()
+ var stateChangeQ: ZkQueue = null
+ val config = new KafkaConfig(TestUtils.createBrokerConfig(brokerId1, port1))
+
+ override def setUp() {
+ super.setUp()
+
+ // create a queue
+ val queuePath = "/brokers/state/" + config.brokerId
+ stateChangeQ = new ZkQueue(zkClient, queuePath, 10)
+ }
+
+ override def tearDown() {
+ super.tearDown()
+ }
+
+ def testZkQueueDrainAll() {
+ for(i <- 0 until 5) {
+ val itemPath = stateChangeQ.put("test:0:follower")
+ val item = itemPath.split("/").last.split("-").last.toInt
+ assertEquals(i, item)
+ }
+
+ var numItems: Int = 0
+ for(i <- 0 until 5) {
+ val item = stateChangeQ.take()
+ assertEquals("test:0:follower", item._2)
+ assertTrue(stateChangeQ.remove(item))
+ numItems += 1
+ }
+ assertEquals(5, numItems)
+
+ for(i <- 5 until 10) {
+ val itemPath = stateChangeQ.put("test:1:follower")
+ val item = itemPath.split("/").last.split("-").last.toInt
+ assertEquals(i+5, item)
+ }
+
+ numItems = 0
+ for(i <- 0 until 5) {
+ val item = stateChangeQ.take()
+ assertTrue(stateChangeQ.remove(item))
+ assertEquals("test:1:follower", item._2)
+ numItems += 1
+ }
+ assertEquals(5, numItems)
+ }
+
+ def testZkQueueFull() {
+ for(i <- 0 until 10) {
+ val itemPath = stateChangeQ.put("test:0:follower")
+ val item = itemPath.split("/").last.split("-").last.toInt
+ assertEquals(i, item)
+ }
+
+ try {
+ stateChangeQ.put("test:0:follower")
+ fail("Queue should be full")
+ }catch {
+ case e:QueueFullException => // expected
+ }
+ }
+
+ def testStateChangeCommandJson() {
+ // test start replica
+ val topic = "foo"
+ val partition = 0
+ val epoch = 1
+
+ val startReplica = new StartReplica(topic, partition, epoch)
+ val startReplicaJson = startReplica.toJson()
+ val startReplicaFromJson = StateChangeCommand.getStateChangeRequest(startReplicaJson)
+ assertEquals(startReplica, startReplicaFromJson)
+
+ // test close replica
+ val closeReplica = new StartReplica(topic, partition, epoch)
+ val closeReplicaJson = startReplica.toJson()
+ val closeReplicaFromJson = StateChangeCommand.getStateChangeRequest(closeReplicaJson)
+ assertEquals(closeReplica, closeReplicaFromJson)
+ }
+
+ // TODO: Do this after patch for delete topic/delete partition is in
+ def testStateChangeRequestValidity() {
+ // mock out the StateChangeRequestHandler
+
+ // setup 3 replicas for one topic partition
+
+ // shutdown follower 1
+
+ // restart leader to trigger epoch change
+
+ // start follower 1
+
+ // test follower 1 acted only on one become follower request
+ }
+}
\ No newline at end of file