You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/24 20:13:04 UTC
svn commit: r1365199 [2/3] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/
contrib/hadoop-producer/src/main/java/kafka/bridge/hadoop/
core/src/main/scala/kafka/admin/ core/src/main/scala/kafka/api/
core/src/main/sca...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Tue Jul 24 18:13:01 2012
@@ -50,7 +50,8 @@ class DefaultEventHandler[K,V](config: P
if (outstandingProduceRequests.size > 0) {
// back off and update the topic metadata cache before attempting another send operation
Thread.sleep(config.producerRetryBackoffMs)
- Utils.swallowError(brokerPartitionInfo.updateInfo())
+ // get topics of the outstanding produce requests and refresh metadata for those
+ Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
remainingRetries -= 1
}
}
@@ -62,66 +63,77 @@ class DefaultEventHandler[K,V](config: P
}
private def dispatchSerializedData(messages: Seq[ProducerData[K,Message]]): Seq[ProducerData[K, Message]] = {
- val partitionedData = partitionAndCollate(messages)
- val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
- try {
- for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
- if (logger.isTraceEnabled)
- eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
- .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
- val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
-
- val failedTopicPartitions = send(brokerid, messageSetPerBroker)
- for( (topic, partition) <- failedTopicPartitions ) {
- eventsPerBrokerMap.get((topic, partition)) match {
- case Some(data) => failedProduceRequests.appendAll(data)
- case None => // nothing
+ val partitionedDataOpt = partitionAndCollate(messages)
+ partitionedDataOpt match {
+ case Some(partitionedData) =>
+ val failedProduceRequests = new ListBuffer[ProducerData[K,Message]]
+ try {
+ for ((brokerid, eventsPerBrokerMap) <- partitionedData) {
+ if (logger.isTraceEnabled)
+ eventsPerBrokerMap.foreach(partitionAndEvent => trace("Handling event for Topic: %s, Broker: %d, Partitions: %s"
+ .format(partitionAndEvent._1, brokerid, partitionAndEvent._2)))
+ val messageSetPerBroker = groupMessagesToSet(eventsPerBrokerMap)
+
+ val failedTopicPartitions = send(brokerid, messageSetPerBroker)
+ for( (topic, partition) <- failedTopicPartitions ) {
+ eventsPerBrokerMap.get((topic, partition)) match {
+ case Some(data) => failedProduceRequests.appendAll(data)
+ case None => // nothing
+ }
+ }
}
+ } catch {
+ case t: Throwable => error("Failed to send messages")
}
- }
- } catch {
- case t: Throwable => error("Failed to send messages")
+ failedProduceRequests
+ case None => // all produce requests failed
+ messages
}
- failedProduceRequests
}
def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
}
- def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]] = {
+ def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
val ret = new HashMap[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]
- for (event <- events) {
- val topicPartitionsList = getPartitionListForTopic(event)
- val totalNumPartitions = topicPartitionsList.length
-
- val partitionIndex = getPartition(event.getKey, totalNumPartitions)
- val brokerPartition = topicPartitionsList(partitionIndex)
-
- // postpone the failure until the send operation, so that requests for other brokers are handled correctly
- val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
-
- var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
- ret.get(leaderBrokerId) match {
- case Some(element) =>
- dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
- case None =>
- dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
- ret.put(leaderBrokerId, dataPerBroker)
- }
+ try {
+ for (event <- events) {
+ val topicPartitionsList = getPartitionListForTopic(event)
+ val totalNumPartitions = topicPartitionsList.length
+
+ val partitionIndex = getPartition(event.getKey, totalNumPartitions)
+ val brokerPartition = topicPartitionsList(partitionIndex)
+
+ // postpone the failure until the send operation, so that requests for other brokers are handled correctly
+ val leaderBrokerId = brokerPartition.leaderId().getOrElse(-1)
+
+ var dataPerBroker: HashMap[(String, Int), Seq[ProducerData[K,Message]]] = null
+ ret.get(leaderBrokerId) match {
+ case Some(element) =>
+ dataPerBroker = element.asInstanceOf[HashMap[(String, Int), Seq[ProducerData[K,Message]]]]
+ case None =>
+ dataPerBroker = new HashMap[(String, Int), Seq[ProducerData[K,Message]]]
+ ret.put(leaderBrokerId, dataPerBroker)
+ }
- val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
- var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
- dataPerBroker.get(topicAndPartition) match {
- case Some(element) =>
- dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
- case None =>
- dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
- dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+ val topicAndPartition = (event.getTopic, brokerPartition.partitionId)
+ var dataPerTopicPartition: ListBuffer[ProducerData[K,Message]] = null
+ dataPerBroker.get(topicAndPartition) match {
+ case Some(element) =>
+ dataPerTopicPartition = element.asInstanceOf[ListBuffer[ProducerData[K,Message]]]
+ case None =>
+ dataPerTopicPartition = new ListBuffer[ProducerData[K,Message]]
+ dataPerBroker.put(topicAndPartition, dataPerTopicPartition)
+ }
+ dataPerTopicPartition.append(event)
}
- dataPerTopicPartition.append(event)
+ Some(ret)
+ }catch { // Swallow recoverable exceptions and return None so that they can be retried.
+ case ute: UnknownTopicException => warn("Failed to collate messages by topic,partition due to", ute); None
+ case lnae: LeaderNotAvailableException => warn("Failed to collate messages by topic,partition due to", lnae); None
+ case oe => error("Failed to collate messages by topic, partition due to", oe); throw oe
}
- ret
}
private def getPartitionListForTopic(pd: ProducerData[K,Message]): Seq[Partition] = {
@@ -144,12 +156,12 @@ class DefaultEventHandler[K,V](config: P
private def getPartition(key: K, numPartitions: Int): Int = {
if(numPartitions <= 0)
throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
- "\n Valid values are > 0")
+ "\n Valid values are > 0")
val partition = if(key == null) Utils.getNextRandomInt(numPartitions)
- else partitioner.partition(key, numPartitions)
+ else partitioner.partition(key, numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new InvalidPartitionException("Invalid partition id : " + partition +
- "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
+ "\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
partition
}
@@ -171,7 +183,8 @@ class DefaultEventHandler[K,V](config: P
partitionData.append(new PartitionData(partitionId, messagesSet))
}
val topicData = topics.map(kv => new TopicData(kv._1, kv._2.toArray)).toArray
- val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks, config.ackTimeoutMs, topicData)
+ val producerRequest = new ProducerRequest(config.correlationId, config.clientId, config.requiredAcks,
+ config.requestTimeoutMs, topicData)
try {
val syncProducer = producerPool.getProducer(brokerId)
val response = syncProducer.send(producerRequest)
@@ -204,46 +217,43 @@ class DefaultEventHandler[K,V](config: P
*/
val messagesPerTopicPartition = eventsPerTopicAndPartition.map { e =>
- {
- val topicAndPartition = e._1
- val produceData = e._2
- val messages = new ListBuffer[Message]
- produceData.map(p => messages.appendAll(p.getData))
-
- ( topicAndPartition,
- config.compressionCodec match {
- case NoCompressionCodec =>
- trace("Sending %d messages with no compression to topic %s on partition %d"
- .format(messages.size, topicAndPartition._1, topicAndPartition._2))
- new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
- case _ =>
- config.compressedTopics.size match {
- case 0 =>
+ val topicAndPartition = e._1
+ val produceData = e._2
+ val messages = new ListBuffer[Message]
+ produceData.map(p => messages.appendAll(p.getData))
+ ( topicAndPartition,
+ config.compressionCodec match {
+ case NoCompressionCodec =>
+ trace("Sending %d messages with no compression to topic %s on partition %d"
+ .format(messages.size, topicAndPartition._1, topicAndPartition._2))
+ new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ case _ =>
+ config.compressedTopics.size match {
+ case 0 =>
+ trace("Sending %d messages with compression codec %d to topic %s on partition %d"
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ new ByteBufferMessageSet(config.compressionCodec, messages: _*)
+ case _ =>
+ if(config.compressedTopics.contains(topicAndPartition._1)) {
trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
+ .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
new ByteBufferMessageSet(config.compressionCodec, messages: _*)
- case _ =>
- if(config.compressedTopics.contains(topicAndPartition._1)) {
- trace("Sending %d messages with compression codec %d to topic %s on partition %d"
- .format(messages.size, config.compressionCodec.codec, topicAndPartition._1, topicAndPartition._2))
- new ByteBufferMessageSet(config.compressionCodec, messages: _*)
- }
- else {
- trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
- .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
- config.compressedTopics.toString))
- new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
- }
- }
- }
+ }
+ else {
+ trace("Sending %d messages to topic %s and partition %d with no compression as %s is not in compressed.topics - %s"
+ .format(messages.size, topicAndPartition._1, topicAndPartition._2, topicAndPartition._1,
+ config.compressedTopics.toString))
+ new ByteBufferMessageSet(NoCompressionCodec, messages: _*)
+ }
+ }
+ }
)
- }
}
messagesPerTopicPartition
}
def close() {
if (producerPool != null)
- producerPool.close
+ producerPool.close
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Tue Jul 24 18:13:01 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -30,7 +30,6 @@ import org.apache.log4j.Logger
import scala.collection._
import mutable.HashMap
import scala.math._
-import java.lang.IllegalStateException
import kafka.network.RequestChannel.Response
/**
@@ -48,8 +47,8 @@ class KafkaApis(val requestChannel: Requ
/**
* Top-level method that handles all requests and multiplexes to the right api
*/
- def handle(request: RequestChannel.Request) {
- val apiId = request.request.buffer.getShort()
+ def handle(request: RequestChannel.Request) {
+ val apiId = request.request.buffer.getShort()
apiId match {
case RequestKeys.Produce => handleProducerRequest(request)
case RequestKeys.Fetch => handleFetchRequest(request)
@@ -57,7 +56,7 @@ class KafkaApis(val requestChannel: Requ
case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
- case _ => throw new IllegalStateException("No mapping found for handler id " + apiId)
+ case _ => throw new KafkaException("No mapping found for handler id " + apiId)
}
}
@@ -113,6 +112,7 @@ class KafkaApis(val requestChannel: Requ
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
requestLogger.trace("Producer request " + request.toString)
+ trace("Broker %s received produce request %s".format(logManager.config.brokerId, produceRequest.toString))
val response = produceToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
@@ -172,7 +172,7 @@ class KafkaApis(val requestChannel: Requ
val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
- offsets(msgIndex) = log.nextAppendOffset
+ offsets(msgIndex) = log.logEndOffset
errors(msgIndex) = ErrorMapping.NoError.toShort
trace(partitionData.messages.sizeInBytes + " bytes written to logs.")
} catch {
@@ -201,7 +201,7 @@ class KafkaApis(val requestChannel: Requ
val fetchRequest = FetchRequest.readFrom(request.request.buffer)
if(requestLogger.isTraceEnabled)
requestLogger.trace("Fetch request " + fetchRequest.toString)
-
+ trace("Broker %s received fetch request %s".format(logManager.config.brokerId, fetchRequest.toString))
// validate the request
try {
fetchRequest.validate()
@@ -243,7 +243,7 @@ class KafkaApis(val requestChannel: Requ
fetchRequestPurgatory.watch(delayedFetch)
}
}
-
+
/**
* Calculate the number of available bytes for the given fetch request
*/
@@ -255,7 +255,7 @@ class KafkaApis(val requestChannel: Requ
debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
val maybeLog = logManager.getLog(offsetDetail.topic, offsetDetail.partitions(i))
val available = maybeLog match {
- case Some(log) => max(0, log.highwaterMark - offsetDetail.offsets(i))
+ case Some(log) => max(0, log.logEndOffset - offsetDetail.offsets(i))
case None => 0
}
totalBytes += math.min(offsetDetail.fetchSizes(i), available)
@@ -319,6 +319,8 @@ class KafkaApis(val requestChannel: Requ
val replica = replicaOpt.get
debug("Leader %d for topic %s partition %d received fetch request from follower %d"
.format(logManager.config.brokerId, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
+ debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+ .format(logManager.config.brokerId, messages.sizeInBytes, replica.topic, replica.partition.partitionId, fetchRequest.replicaId))
new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark(), messages)
}
}
@@ -382,36 +384,47 @@ class KafkaApis(val requestChannel: Requ
val topicsMetadata = new mutable.ArrayBuffer[TopicMetadata]()
val zkClient = kafkaZookeeper.getZookeeperClient
- val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+ var errorCode = ErrorMapping.NoError
+ val config = logManager.config
- metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
- val topic = topicAndMetadata._1
- topicAndMetadata._2 match {
- case Some(metadata) => topicsMetadata += metadata
- case None =>
- /* check if auto creation of topics is turned on */
- val config = logManager.config
- if(config.autoCreateTopics) {
- CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
- info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
- .format(topic, config.numPartitions, config.defaultReplicationFactor))
- val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
- newTopicMetadata match {
- case Some(topicMetadata) => topicsMetadata += topicMetadata
- case None =>
- throw new IllegalStateException("Topic metadata for automatically created topic %s does not exist".format(topic))
+ try {
+ val topicMetadataList = AdminUtils.getTopicMetaDataFromZK(metadataRequest.topics, zkClient)
+
+ metadataRequest.topics.zip(topicMetadataList).foreach { topicAndMetadata =>
+ val topic = topicAndMetadata._1
+ topicAndMetadata._2.errorCode match {
+ case ErrorMapping.NoError => topicsMetadata += topicAndMetadata._2
+ case ErrorMapping.UnknownTopicCode =>
+ /* check if auto creation of topics is turned on */
+ if(config.autoCreateTopics) {
+ CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
+ info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
+ .format(topic, config.numPartitions, config.defaultReplicationFactor))
+ val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
+ newTopicMetadata.errorCode match {
+ case ErrorMapping.NoError => topicsMetadata += newTopicMetadata
+ case _ =>
+ throw new KafkaException("Topic metadata for automatically created topic %s does not exist".format(topic))
+ }
}
- }
+ case _ => error("Error while fetching topic metadata for topic " + topic,
+ ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
+ }
}
+ }catch {
+ case e => error("Error while retrieving topic metadata", e)
+ // convert exception type to error code
+ errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
}
- val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq)
+ topicsMetadata.foreach(metadata => trace("Sending topic metadata " + metadata.toString))
+ val response = new TopicMetaDataResponse(metadataRequest.versionId, topicsMetadata.toSeq, errorCode)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
}
def close() {
fetchRequestPurgatory.shutdown()
}
-
+
/**
* A delayed fetch request
*/
@@ -423,7 +436,7 @@ class KafkaApis(val requestChannel: Requ
* A holding pen for fetch requests waiting to be satisfied
*/
class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, PartitionData] {
-
+
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
@@ -432,7 +445,7 @@ class KafkaApis(val requestChannel: Requ
val accumulatedSize = delayedFetch.bytesAccumulated.addAndGet(messageDataSize)
accumulatedSize >= delayedFetch.fetch.minBytes
}
-
+
/**
* When a request expires just answer it with whatever data is present
*/
@@ -522,7 +535,7 @@ class KafkaApis(val requestChannel: Requ
numAcks, produce.requiredAcks,
topic, partitionId))
if ((produce.requiredAcks < 0 && numAcks >= isr.size) ||
- (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
+ (produce.requiredAcks > 0 && numAcks >= produce.requiredAcks)) {
/*
* requiredAcks < 0 means acknowledge after all replicas in ISR
* are fully caught up to the (local) leader's offset
@@ -563,7 +576,7 @@ class KafkaApis(val requestChannel: Requ
override def toString =
"acksPending:%b, error: %d, requiredOffset: %d".format(
- acksPending, error, requiredOffset
+ acksPending, error, requiredOffset
)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Tue Jul 24 18:13:01 2012
@@ -113,9 +113,9 @@ class KafkaConfig(props: Properties) ext
* leader election on all replicas minus the preferred replica */
val preferredReplicaWaitTime = Utils.getLong(props, "preferred.replica.wait.time", 300)
- val keepInSyncTimeMs = Utils.getLong(props, "isr.in.sync.time.ms", 30000)
+ val replicaMaxLagTimeMs = Utils.getLong(props, "replica.max.lag.time.ms", 10000)
- val keepInSyncBytes = Utils.getLong(props, "isr.in.sync.bytes", 4000)
+ val replicaMaxLagBytes = Utils.getLong(props, "replica.max.lag.bytes", 4000)
/* size of the state change request queue in Zookeeper */
val stateChangeQSize = Utils.getInt(props, "state.change.queue.size", 1000)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue Jul 24 18:13:01 2012
@@ -134,11 +134,15 @@ class ControllerChannelManager(allBroker
def removeBroker(brokerId: Int){
brokers.remove(brokerId)
- messageChannels(brokerId).disconnect()
- messageChannels.remove(brokerId)
- messageQueues.remove(brokerId)
- messageThreads(brokerId).shutDown()
- messageThreads.remove(brokerId)
+ try {
+ messageChannels(brokerId).disconnect()
+ messageChannels.remove(brokerId)
+ messageQueues.remove(brokerId)
+ messageThreads(brokerId).shutDown()
+ messageThreads.remove(brokerId)
+ }catch {
+ case e => error("Error while removing broker by the controller", e)
+ }
}
}
@@ -189,10 +193,10 @@ class KafkaController(config : KafkaConf
}
def shutDown() = {
- if(controllerChannelManager != null){
+ if(controllerChannelManager != null)
controllerChannelManager.shutDown()
- }
- zkClient.close()
+ if(zkClient != null)
+ zkClient.close()
}
def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jul 24 18:13:01 2012
@@ -25,6 +25,7 @@ import java.util.concurrent._
import atomic.AtomicBoolean
import kafka.cluster.Replica
import org.I0Itec.zkclient.ZkClient
+import kafka.common.KafkaZookeeperClient
/**
@@ -44,6 +45,7 @@ class KafkaServer(val config: KafkaConfi
private var replicaManager: ReplicaManager = null
private var apis: KafkaApis = null
var kafkaController: KafkaController = new KafkaController(config)
+ var zkClient: ZkClient = null
/**
* Start up API for bringing up a single instance of the Kafka server.
@@ -59,6 +61,9 @@ class KafkaServer(val config: KafkaConfi
needRecovery = false
cleanShutDownFile.delete
}
+ /* start client */
+ info("connecting to ZK: " + config.zkConnect)
+ zkClient = KafkaZookeeperClient.getZookeeperClient(config)
logManager = new LogManager(config,
SystemTime,
1000L * 60 * config.logCleanupIntervalMinutes,
@@ -73,9 +78,9 @@ class KafkaServer(val config: KafkaConfi
config.maxSocketRequestSize)
Utils.registerMBean(socketServer.stats, statsMBeanName)
- kafkaZookeeper = new KafkaZooKeeper(config, addReplica, getReplica, makeLeader, makeFollower)
+ kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
- replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient)
+ replicaManager = new ReplicaManager(config, time, zkClient)
apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@@ -112,6 +117,8 @@ class KafkaServer(val config: KafkaConfi
kafkaController.shutDown()
kafkaZookeeper.close
+ info("Closing zookeeper client...")
+ zkClient.close()
val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jul 24 18:13:01 2012
@@ -23,7 +23,7 @@ import kafka.utils._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
import kafka.admin.AdminUtils
-import java.lang.{Thread, IllegalStateException}
+import java.lang.Thread
import collection.mutable.HashSet
import kafka.common._
@@ -34,13 +34,13 @@ import kafka.common._
*
*/
class KafkaZooKeeper(config: KafkaConfig,
+ zkClient: ZkClient,
addReplicaCbk: (String, Int, Set[Int]) => Replica,
getReplicaCbk: (String, Int) => Option[Replica],
becomeLeader: (Replica, Seq[Int]) => Unit,
becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
- private var zkClient: ZkClient = null
private var leaderChangeListener: LeaderChangeListener = null
private var topicPartitionsChangeListener: TopicChangeListener = null
private var stateChangeHandler: StateChangeCommandHandler = null
@@ -49,9 +49,8 @@ class KafkaZooKeeper(config: KafkaConfig
private val leaderChangeLock = new Object
def startup() {
- /* start client */
- info("connecting to ZK: " + config.zkConnect)
- zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+ leaderChangeListener = new LeaderChangeListener
+ topicPartitionsChangeListener = new TopicChangeListener
leaderChangeListener = new LeaderChangeListener
topicPartitionsChangeListener = new TopicChangeListener
startStateChangeCommandHandler()
@@ -106,11 +105,7 @@ class KafkaZooKeeper(config: KafkaConfig
}
def close() {
- if (zkClient != null) {
- stateChangeHandler.shutdown()
- info("Closing zookeeper client...")
- zkClient.close()
- }
+ stateChangeHandler.shutdown()
}
def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
@@ -122,10 +117,10 @@ class KafkaZooKeeper(config: KafkaConfig
ZkUtils.getLeaderForPartition(zkClient, topic, partition) match {
case Some(leader) =>
if(leader != config.brokerId)
- throw new NotLeaderForPartitionException("Broker %d is not leader for partition %d for topic %s"
+ throw new LeaderNotAvailableException("Broker %d is not leader for partition %d for topic %s"
.format(config.brokerId, partition, topic))
case None =>
- throw new NoLeaderForPartitionException("There is no leader for topic %s partition %d".format(topic, partition))
+ throw new LeaderNotAvailableException("There is no leader for topic %s partition %d".format(topic, partition))
}
}
@@ -345,7 +340,8 @@ class KafkaZooKeeper(config: KafkaConfig
val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
- debug("Leader change listener fired for path %s. New leader is %d. New epoch is %d".format(dataPath, newLeader, newEpoch))
+ debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
+ dataPath, newLeader, newEpoch))
val topicPartitionInfo = dataPath.split("/")
val topic = topicPartitionInfo.takeRight(4).head
val partition = topicPartitionInfo.takeRight(2).head.toInt
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Tue Jul 24 18:13:01 2012
@@ -22,10 +22,10 @@ import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
- extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
- socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
- fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
- minBytes = brokerConfig.replicaMinBytes) {
+ extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
+ socketBufferSize = brokerConfig.replicaSocketBufferSize, fetchSize = brokerConfig.replicaFetchSize,
+ fetcherBrokerId = brokerConfig.brokerId, maxWait = brokerConfig.replicaMaxWaitTimeMs,
+ minBytes = brokerConfig.replicaMinBytes) {
// process fetched data
def processPartitionData(topic: String, fetchOffset: Long, partitionData: PartitionData) {
@@ -35,10 +35,15 @@ class ReplicaFetcherThread(name:String,
if (fetchOffset != replica.logEndOffset())
throw new RuntimeException("offset mismatch: fetchOffset=%d, logEndOffset=%d".format(fetchOffset, replica.logEndOffset()))
+ trace("Follower %d has replica log end offset %d. Received %d messages and leader hw %d".format(replica.brokerId,
+ replica.logEndOffset(), messageSet.sizeInBytes, partitionData.hw))
replica.log.get.append(messageSet)
- replica.highWatermark(Some(partitionData.hw))
- trace("follower %d set replica highwatermark for topic %s partition %d to %d"
- .format(replica.brokerId, topic, partitionId, partitionData.hw))
+ trace("Follower %d has replica log end offset %d after appending %d messages"
+ .format(replica.brokerId, replica.logEndOffset(), messageSet.sizeInBytes))
+ val followerHighWatermark = replica.logEndOffset().min(partitionData.hw)
+ replica.highWatermark(Some(followerHighWatermark))
+ trace("Follower %d set replica highwatermark for topic %s partition %d to %d"
+ .format(replica.brokerId, topic, partitionId, followerHighWatermark))
}
// handle a partition whose offset is out of range and return a new fetch offset
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Jul 24 18:13:01 2012
@@ -19,12 +19,11 @@ package kafka.server
import kafka.log.Log
import kafka.cluster.{Partition, Replica}
import collection.mutable
-import java.lang.IllegalStateException
import mutable.ListBuffer
import org.I0Itec.zkclient.ZkClient
import java.util.concurrent.locks.ReentrantLock
import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.common.InvalidPartitionException
+import kafka.common.{KafkaException, InvalidPartitionException}
class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
@@ -36,7 +35,7 @@ class ReplicaManager(val config: KafkaCo
// start ISR expiration thread
isrExpirationScheduler.startUp
- isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.keepInSyncTimeMs)
+ isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs)
def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
@@ -112,7 +111,7 @@ class ReplicaManager(val config: KafkaCo
case Some(replicas) =>
Some(replicas.leaderReplica())
case None =>
- throw new IllegalStateException("Getting leader replica failed. Partition replica metadata for topic " +
+ throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
"%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
}
}
@@ -136,15 +135,17 @@ class ReplicaManager(val config: KafkaCo
if(newHw > oldHw) {
debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
partition.leaderHW(Some(newHw))
- }
+ }else
+ debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
+ replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
}
def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
- // stop replica fetcher thread, if any
- replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// read and cache the ISR
replica.partition.leaderId(Some(replica.brokerId))
replica.partition.updateISR(currentISRInZk.toSet)
+ // stop replica fetcher thread, if any
+ replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
// also add this partition to the list of partitions for which the leader is the current broker
try {
leaderReplicaLock.lock()
@@ -157,6 +158,8 @@ class ReplicaManager(val config: KafkaCo
def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
info("broker %d intending to follow leader %d for topic %s partition %d"
.format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+ // set the leader for this partition correctly on this broker
+ replica.partition.leaderId(Some(leaderBrokerId))
// remove this replica's partition from the ISR expiration queue
try {
leaderReplicaLock.lock()
@@ -186,12 +189,11 @@ class ReplicaManager(val config: KafkaCo
def maybeShrinkISR(): Unit = {
try {
info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
- .format(config.keepInSyncTimeMs))
-
+ .format(config.replicaMaxLagTimeMs))
leaderReplicaLock.lock()
leaderReplicas.foreach { partition =>
// shrink ISR if a follower is slow or stuck
- val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.keepInSyncTimeMs, config.keepInSyncBytes)
+ val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
if(outOfSyncReplicas.size > 0) {
val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.size > 0)
@@ -215,7 +217,7 @@ class ReplicaManager(val config: KafkaCo
val leaderHW = partition.leaderHW()
replica.logEndOffset() >= leaderHW
}
- else throw new IllegalStateException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+ else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
" topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
}
@@ -230,9 +232,10 @@ class ReplicaManager(val config: KafkaCo
// update ISR in ZK and cache
replica.partition.updateISR(newISR.map(_.brokerId), Some(zkClient))
}
+ debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partition))
maybeIncrementLeaderHW(replica)
case None =>
- throw new IllegalStateException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
+ throw new KafkaException("No replica %d in replica manager on %d".format(replicaId, config.brokerId))
}
}
@@ -242,12 +245,14 @@ class ReplicaManager(val config: KafkaCo
case Some(replica) =>
replica.logEndOffsetUpdateTime(Some(time.milliseconds))
case None =>
- throw new IllegalStateException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
+ throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
}
}
def close() {
+ info("Closing replica manager on broker " + config.brokerId)
isrExpirationScheduler.shutdown()
replicaFetcherManager.shutdown()
+ info("Replica manager shutdown on broker " + config.brokerId)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/StateChangeCommand.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,7 @@
package kafka.server
import util.parsing.json.JSON
-import java.lang.IllegalStateException
+import kafka.common.KafkaException
import kafka.utils.{Utils, Logging}
import collection.mutable.HashMap
@@ -45,10 +45,10 @@ object StateChangeCommand extends Loggin
request match {
case StartReplica => new StartReplica(topic, partition, epoch)
case CloseReplica => new CloseReplica(topic, partition, epoch)
- case _ => throw new IllegalStateException("Unknown state change request " + request)
+ case _ => throw new KafkaException("Unknown state change request " + request)
}
case None =>
- throw new IllegalStateException("Illegal state change request JSON " + requestJson)
+ throw new KafkaException("Illegal state change request JSON " + requestJson)
}
case None => throw new RuntimeException("Error parsing state change request : " + requestJson)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala Tue Jul 24 18:13:01 2012
@@ -17,6 +17,9 @@
package kafka.utils
+import kafka.common.KafkaException
+import java.lang.IllegalStateException
+
class State
object DONE extends State
object READY extends State
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Tue Jul 24 18:13:01 2012
@@ -19,7 +19,6 @@ package kafka.utils
import java.util.concurrent._
import java.util.concurrent.atomic._
-import java.lang.IllegalStateException
/**
* A scheduler for running jobs in the background
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Tue Jul 24 18:13:01 2012
@@ -20,7 +20,7 @@ package kafka.utils
import org.I0Itec.zkclient.ZkClient
import kafka.consumer.{SimpleConsumer, ConsumerConfig}
import kafka.api.OffsetRequest
-import java.lang.IllegalStateException
+import kafka.common.KafkaException
/**
* A utility that updates the offset of every broker partition to the offset of earliest or latest log segment file, in ZK.
@@ -58,13 +58,13 @@ object UpdateOffsetsInZK {
val broker = brokerHostingPartition match {
case Some(b) => b
- case None => throw new IllegalStateException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
+ case None => throw new KafkaException("Broker " + brokerHostingPartition + " is unavailable. Cannot issue " +
"getOffsetsBefore request")
}
val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker))
if(brokerInfos.size == 0)
- throw new IllegalStateException("Broker information for broker id %d does not exist in ZK".format(broker))
+ throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
val brokerInfo = brokerInfos.head
val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Tue Jul 24 18:13:01 2012
@@ -30,6 +30,7 @@ import kafka.message.{NoCompressionCodec
import org.I0Itec.zkclient.ZkClient
import java.util.{Random, Properties}
import joptsimple.{OptionSpec, OptionSet, OptionParser}
+import kafka.common.KafkaException
/**
@@ -145,7 +146,7 @@ object Utils extends Logging {
if(string == null) {
buffer.putShort(-1)
} else if(string.length > Short.MaxValue) {
- throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+ throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
buffer.putShort(string.length.asInstanceOf[Short])
buffer.put(string.getBytes(encoding))
@@ -163,7 +164,7 @@ object Utils extends Logging {
} else {
val encodedString = string.getBytes(encoding)
if(encodedString.length > Short.MaxValue) {
- throw new IllegalArgumentException("String exceeds the maximum size of " + Short.MaxValue + ".")
+ throw new KafkaException("String exceeds the maximum size of " + Short.MaxValue + ".")
} else {
2 + encodedString.length
}
@@ -188,7 +189,7 @@ object Utils extends Logging {
if(props.containsKey(name))
return getInt(props, name, -1)
else
- throw new IllegalArgumentException("Missing required property '" + name + "'")
+ throw new KafkaException("Missing required property '" + name + "'")
}
/**
@@ -211,7 +212,7 @@ object Utils extends Logging {
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
- * @throws IllegalArgumentException If the value is not in the given range
+ * @throws KafkaException If the value is not in the given range
* @return the integer value
*/
def getIntInRange(props: Properties, name: String, default: Int, range: (Int, Int)): Int = {
@@ -221,7 +222,7 @@ object Utils extends Logging {
else
default
if(v < range._1 || v > range._2)
- throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
}
@@ -233,7 +234,7 @@ object Utils extends Logging {
else
default
if(v < range._1 || v > range._2)
- throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
}
@@ -241,21 +242,21 @@ object Utils extends Logging {
def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int)): Int = {
val value = buffer.getInt
if(value < range._1 || value > range._2)
- throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
def getShortInRange(buffer: ByteBuffer, name: String, range: (Short, Short)): Short = {
val value = buffer.getShort
if(value < range._1 || value > range._2)
- throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
def getLongInRange(buffer: ByteBuffer, name: String, range: (Long, Long)): Long = {
val value = buffer.getLong
if(value < range._1 || value > range._2)
- throw new IllegalArgumentException(name + " has value " + value + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + value + " which is not in the range " + range + ".")
else value
}
@@ -266,7 +267,7 @@ object Utils extends Logging {
if(props.containsKey(name))
return getLong(props, name, -1)
else
- throw new IllegalArgumentException("Missing required property '" + name + "'")
+ throw new KafkaException("Missing required property '" + name + "'")
}
/**
@@ -286,7 +287,7 @@ object Utils extends Logging {
* @param name The property name
* @param default The default value to use if the property is not found
* @param range The range in which the value must fall (inclusive)
- * @throws IllegalArgumentException If the value is not in the given range
+ * @throws KafkaException If the value is not in the given range
* @return the long value
*/
def getLongInRange(props: Properties, name: String, default: Long, range: (Long, Long)): Long = {
@@ -296,7 +297,7 @@ object Utils extends Logging {
else
default
if(v < range._1 || v > range._2)
- throw new IllegalArgumentException(name + " has value " + v + " which is not in the range " + range + ".")
+ throw new KafkaException(name + " has value " + v + " which is not in the range " + range + ".")
else
v
}
@@ -316,7 +317,7 @@ object Utils extends Logging {
else if("false" == props.getProperty(name))
false
else
- throw new IllegalArgumentException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
+ throw new KafkaException("Unacceptable value for property '" + name + "', boolean values must be either 'true' or 'false" )
}
/**
@@ -336,7 +337,7 @@ object Utils extends Logging {
if(props.containsKey(name))
props.getProperty(name)
else
- throw new IllegalArgumentException("Missing required property '" + name + "'")
+ throw new KafkaException("Missing required property '" + name + "'")
}
/**
@@ -350,13 +351,13 @@ object Utils extends Logging {
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
if(prop.length != 2)
- throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+ throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties
}
else
- throw new IllegalArgumentException("Missing required property '" + name + "'")
+ throw new KafkaException("Missing required property '" + name + "'")
}
/**
@@ -367,12 +368,12 @@ object Utils extends Logging {
val propString = props.getProperty(name)
val propValues = propString.split(",")
if(propValues.length < 1)
- throw new IllegalArgumentException("Illegal format of specifying properties '" + propString + "'")
+ throw new KafkaException("Illegal format of specifying properties '" + propString + "'")
val properties = new Properties
for(i <- 0 until propValues.length) {
val prop = propValues(i).split("=")
if(prop.length != 2)
- throw new IllegalArgumentException("Illegal format of specifying properties '" + propValues(i) + "'")
+ throw new KafkaException("Illegal format of specifying properties '" + propValues(i) + "'")
properties.put(prop(0), prop(1))
}
properties
@@ -608,7 +609,7 @@ object Utils extends Logging {
def notNull[V](v: V) = {
if(v == null)
- throw new IllegalArgumentException("Value cannot be null.")
+ throw new KafkaException("Value cannot be null.")
else
v
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@
package kafka.utils
import java.util.Properties
-import java.util.concurrent.locks.Condition
import kafka.cluster.{Broker, Cluster}
import kafka.common.NoEpochForPartitionException
import kafka.consumer.TopicCount
@@ -27,6 +26,7 @@ import org.I0Itec.zkclient.exception.{Zk
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import util.parsing.json.JSON
+import java.util.concurrent.locks.{ReentrantLock, Condition}
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@@ -481,18 +481,29 @@ object ZkUtils extends Logging {
}
-class LeaderExists(topic: String, partition: Int, leaderExists: Condition) extends IZkDataListener {
+class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
val t = dataPath.split("/").takeRight(3).head
val p = dataPath.split("/").takeRight(2).head.toInt
- if(t == topic && p == partition)
- leaderExists.signal()
+ leaderLock.lock()
+ try {
+ if(t == topic && p == partition)
+ leaderExists.signal()
+ }
+ finally {
+ leaderLock.unlock()
+ }
}
@throws(classOf[Exception])
def handleDataDeleted(dataPath: String) {
- leaderExists.signal()
+ leaderLock.lock()
+ try {
+ leaderExists.signal()
+ }finally {
+ leaderLock.unlock()
+ }
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
+import kafka.common.ErrorMapping
import kafka.utils.TestUtils
class AdminTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -137,13 +138,28 @@ class AdminTest extends JUnit3Suite with
10 -> List("1", "2", "3"),
11 -> List("1", "3", "4")
)
- TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
-
+ val leaderForPartitionMap = Map(
+ 0 -> 0,
+ 1 -> 1,
+ 2 -> 2,
+ 3 -> 3,
+ 4 -> 4,
+ 5 -> 0,
+ 6 -> 1,
+ 7 -> 2,
+ 8 -> 3,
+ 9 -> 4,
+ 10 -> 1,
+ 11 -> 1
+ )
val topic = "test"
+ TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
// create the topic
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ // create leaders for all partitions
+ TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
- .get.partitionsMetadata.map(p => p.replicas)
+ .partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for( i <- 0 until actualReplicaList.size ) {
@@ -165,23 +181,30 @@ class AdminTest extends JUnit3Suite with
0 -> List("0", "1", "2"),
1 -> List("1", "2", "3")
)
+ val leaderForPartitionMap = Map(
+ 0 -> 0,
+ 1 -> 1
+ )
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ // create leaders for all partitions
+ TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
- newTopicMetadata match {
- case Some(metadata) =>
- assertEquals(topic, metadata.topic)
- assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
- assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
- val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
+ newTopicMetadata.errorCode match {
+ case ErrorMapping.UnknownTopicCode =>
+ fail("Topic " + topic + " should've been automatically created")
+ case _ =>
+ assertEquals(topic, newTopicMetadata.topic)
+ assertNotNull("partition metadata list cannot be null", newTopicMetadata.partitionsMetadata)
+ assertEquals("partition metadata list length should be 2", 2, newTopicMetadata.partitionsMetadata.size)
+ val actualReplicaAssignment = newTopicMetadata.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
for(i <- 0 until actualReplicaList.size) {
assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
}
- case None => fail("Topic " + topic + " should've been automatically created")
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Tue Jul 24 18:13:01 2012
@@ -21,6 +21,7 @@ import kafka.server._
import kafka.utils.{Utils, TestUtils}
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
+import kafka.common.KafkaException
/**
* A test harness that brings up some number of broker nodes
@@ -33,7 +34,7 @@ trait KafkaServerTestHarness extends JUn
override def setUp() {
super.setUp
if(configs.size <= 0)
- throw new IllegalArgumentException("Must suply at least one server config.")
+ throw new KafkaException("Must suply at least one server config.")
servers = configs.map(TestUtils.createServer(_))
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -18,7 +18,6 @@
package kafka.integration
import kafka.api.FetchRequestBuilder
-import kafka.common.OffsetOutOfRangeException
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import org.apache.log4j.{Level, Logger}
@@ -26,6 +25,7 @@ import org.scalatest.junit.JUnit3Suite
import scala.collection._
import kafka.producer.ProducerData
import kafka.utils.TestUtils
+import kafka.common.{KafkaException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
@@ -41,7 +41,7 @@ class LazyInitProducerTest extends JUnit
override def setUp() {
super.setUp
if(configs.size <= 0)
- throw new IllegalArgumentException("Must suply at least one server config.")
+ throw new KafkaException("Must suply at least one server config.")
// temporarily set request handler logger to a higher level
requestHandlerLogger.setLevel(Level.FATAL)
@@ -133,6 +133,8 @@ class LazyInitProducerTest extends JUnit
produceList ::= new ProducerData[String, Message](topic, topic, set)
builder.addFetch(topic, 0, 0, 10000)
}
+ // wait until leader is elected
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
producer.send(produceList: _*)
// wait a bit for produced message to be available
@@ -157,6 +159,9 @@ class LazyInitProducerTest extends JUnit
produceList ::= new ProducerData[String, Message](topic, topic, set)
builder.addFetch(topic, 0, 0, 10000)
}
+ // wait until leader is elected
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
+
producer.send(produceList: _*)
producer.send(produceList: _*)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Jul 24 18:13:01 2012
@@ -25,11 +25,12 @@ import kafka.log.LogManager
import junit.framework.Assert._
import org.easymock.EasyMock
import kafka.network._
-import kafka.api.{TopicMetaDataResponse, TopicMetadataRequest}
import kafka.cluster.Broker
import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.server.{ReplicaManager, KafkaZooKeeper, KafkaApis, KafkaConfig}
+import kafka.common.ErrorMapping
+import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -66,18 +67,38 @@ class TopicMetadataTest extends JUnit3Su
// create topic
val topic = "test"
CreateTopicCommand.createTopic(zkClient, topic, 1)
-
- mockLogManagerAndTestTopic(topic)
+ // set up leader for topic partition 0
+ val leaderForPartitionMap = Map(
+ 0 -> configs.head.brokerId
+ )
+ TestUtils.makeLeaderForPartition(zkClient, topic, leaderForPartitionMap)
+ val topicMetadata = mockLogManagerAndTestTopic(topic)
+ assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+ assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+ val partitionMetadata = topicMetadata.head.partitionsMetadata
+ assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+ assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+ assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+ assertEquals(1, partitionMetadata.head.replicas.size)
}
def testAutoCreateTopic {
// auto create topic
val topic = "test"
- mockLogManagerAndTestTopic(topic)
+ val topicMetadata = mockLogManagerAndTestTopic(topic)
+ assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
+ assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
+ val partitionMetadata = topicMetadata.head.partitionsMetadata
+ assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
+ assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
+ assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
+ assertEquals(0, partitionMetadata.head.replicas.size)
+ assertEquals(None, partitionMetadata.head.leader)
+ assertEquals(ErrorMapping.LeaderNotAvailableCode, partitionMetadata.head.errorCode)
}
- private def mockLogManagerAndTestTopic(topic: String) = {
+ private def mockLogManagerAndTestTopic(topic: String): Seq[TopicMetadata] = {
// topic metadata request only requires 2 APIs from the log manager
val logManager = EasyMock.createMock(classOf[LogManager])
val kafkaZookeeper = EasyMock.createMock(classOf[KafkaZooKeeper])
@@ -109,16 +130,11 @@ class TopicMetadataTest extends JUnit3Su
// check assertions
val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
- assertEquals("Expecting metadata only for 1 topic", 1, topicMetadata.size)
- assertEquals("Expecting metadata for the test topic", "test", topicMetadata.head.topic)
- val partitionMetadata = topicMetadata.head.partitionsMetadata
- assertEquals("Expecting metadata for 1 partition", 1, partitionMetadata.size)
- assertEquals("Expecting partition id to be 0", 0, partitionMetadata.head.partitionId)
- assertEquals(brokers, partitionMetadata.head.replicas)
- assertNull("Not expecting log metadata", partitionMetadata.head.logMetadata.getOrElse(null))
// verify the expected calls to log manager occurred in the right order
EasyMock.verify(kafkaZookeeper)
EasyMock.verify(receivedRequest)
+
+ topicMetadata
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala Tue Jul 24 18:13:01 2012
@@ -23,8 +23,8 @@ import junit.framework.Assert._
import org.scalatest.junit.JUnitSuite
import org.junit.{After, Before, Test}
import kafka.utils.{Utils, TestUtils, Range}
-import kafka.common.OffsetOutOfRangeException
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.common.{KafkaException, OffsetOutOfRangeException}
class LogTest extends JUnitSuite {
@@ -58,7 +58,7 @@ class LogTest extends JUnitSuite {
new Log(logDir, 1024, 1000, false)
fail("Allowed load of corrupt logs without complaint.")
} catch {
- case e: IllegalStateException => "This is good"
+ case e: KafkaException => "This is good"
}
}
@@ -157,14 +157,14 @@ class LogTest extends JUnitSuite {
{
// first test a log segment starting at 0
val log = new Log(logDir, 100, 1000, false)
- val curOffset = log.nextAppendOffset
+ val curOffset = log.logEndOffset
assertEquals(curOffset, 0)
// time goes by; the log file is deleted
log.markDeletedWhile(_ => true)
// we now have a new log; the starting offset of the new log should remain 0
- assertEquals(curOffset, log.nextAppendOffset)
+ assertEquals(curOffset, log.logEndOffset)
}
{
@@ -174,12 +174,12 @@ class LogTest extends JUnitSuite {
for(i <- 0 until numMessages)
log.append(TestUtils.singleMessageSet(Integer.toString(i).getBytes()))
- val curOffset = log.nextAppendOffset
+ val curOffset = log.logEndOffset
// time goes by; the log file is deleted
log.markDeletedWhile(_ => true)
// we now have a new log
- assertEquals(curOffset, log.nextAppendOffset)
+ assertEquals(curOffset, log.logEndOffset)
// time goes by; the log file (which is empty) is deleted again
val deletedSegments = log.markDeletedWhile(_ => true)
@@ -188,7 +188,7 @@ class LogTest extends JUnitSuite {
assertTrue("We shouldn't delete the last empty log segment", deletedSegments.size == 0)
// we now have a new log
- assertEquals(curOffset, log.nextAppendOffset)
+ assertEquals(curOffset, log.logEndOffset)
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/SegmentListTest.scala Tue Jul 24 18:13:01 2012
@@ -20,6 +20,7 @@ package kafka.log
import junit.framework.Assert._
import org.junit.Test
import org.scalatest.junit.JUnitSuite
+import kafka.common.KafkaException
class SegmentListTest extends JUnitSuite {
@@ -74,7 +75,7 @@ class SegmentListTest extends JUnitSuite
sl.truncLast(-1)
fail("Attempt to truncate with illegal index should fail")
}catch {
- case e: IllegalArgumentException => // this is ok
+ case e: KafkaException => // this is ok
}
val deleted = sl.truncLast(4)
assertEquals(hd, sl.view.iterator.toList)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -212,14 +212,14 @@ class AsyncProducerTest extends JUnit3Su
topic2Broker1Data.appendAll(List(new ProducerData[Int,Message]("topic2", 4, new Message("msg5".getBytes))))
val topic2Broker2Data = new ListBuffer[ProducerData[Int,Message]]
topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
- val expectedResult = Map(
+ val expectedResult = Some(Map(
0 -> Map(
("topic1", 0) -> topic1Broker1Data,
("topic2", 0) -> topic2Broker1Data),
1 -> Map(
("topic1", 1) -> topic1Broker2Data,
("topic2", 1) -> topic2Broker2Data)
- )
+ ))
val actualResult = handler.partitionAndCollate(producerDataList)
assertEquals(expectedResult, actualResult)
@@ -356,10 +356,15 @@ class AsyncProducerTest extends JUnit3Su
producerDataList.append(new ProducerData[String,Message]("topic2", new Message("msg2".getBytes)))
producerDataList.append(new ProducerData[String,Message]("topic1", new Message("msg3".getBytes)))
- val partitionedData = handler.partitionAndCollate(producerDataList)
- for ((brokerId, dataPerBroker) <- partitionedData) {
- for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
- assertTrue(partitionId == 0)
+ val partitionedDataOpt = handler.partitionAndCollate(producerDataList)
+ partitionedDataOpt match {
+ case Some(partitionedData) =>
+ for ((brokerId, dataPerBroker) <- partitionedData) {
+ for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+ assertTrue(partitionId == 0)
+ }
+ case None =>
+ fail("Failed to collate requests by topic, partition")
}
EasyMock.verify(producerPool)
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -92,7 +92,7 @@ class ProducerTest extends JUnit3Suite w
props1.put("partitioner.class", "kafka.utils.StaticPartitioner")
props1.put("zk.connect", TestZKUtils.zookeeperConnect)
props1.put("producer.request.required.acks", "2")
- props1.put("producer.request.ack.timeout.ms", "-1")
+ props1.put("producer.request.timeout.ms", "1000")
val props2 = new util.Properties()
props2.putAll(props1)
@@ -155,7 +155,7 @@ class ProducerTest extends JUnit3Suite w
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
- props.put("socket.timeout.ms", "2000")
+ props.put("producer.request.timeout.ms", "2000")
props.put("zk.connect", TestZKUtils.zookeeperConnect)
// create topic
@@ -213,7 +213,7 @@ class ProducerTest extends JUnit3Suite w
val props = new Properties()
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("partitioner.class", "kafka.utils.StaticPartitioner")
- props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+ props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Jul 24 18:13:01 2012
@@ -153,7 +153,7 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
- // the middle message should have been rejected because the topic does not exist
+ // the middle message should have been rejected because broker doesn't lead partition
Assert.assertEquals(ErrorMapping.UnknownTopicCode.toShort, response2.errors(1))
Assert.assertEquals(-1, response2.offsets(1))
}
@@ -167,7 +167,7 @@ class SyncProducerTest extends JUnit3Sui
props.put("host", "localhost")
props.put("port", server.socketServer.port.toString)
props.put("buffer.size", "102400")
- props.put("socket.timeout.ms", String.valueOf(timeoutMs))
+ props.put("producer.request.timeout.ms", String.valueOf(timeoutMs))
val producer = new SyncProducer(new SyncProducerConfig(props))
val messages = new ByteBufferMessageSet(NoCompressionCodec, new Message(messageBytes))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Jul 24 18:13:01 2012
@@ -30,8 +30,8 @@ class ISRExpirationTest extends JUnit3Su
var topicPartitionISR: Map[(String, Int), Seq[Int]] = new HashMap[(String, Int), Seq[Int]]()
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
- override val keepInSyncTimeMs = 100L
- override val keepInSyncBytes = 10L
+ override val replicaMaxLagTimeMs = 100L
+ override val replicaMaxLagBytes = 10L
})
val topic = "foo"
@@ -53,7 +53,7 @@ class ISRExpirationTest extends JUnit3Su
time.sleep(150)
leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
- var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
// add all replicas back to the ISR
@@ -65,8 +65,8 @@ class ISRExpirationTest extends JUnit3Su
(partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
time.sleep(150)
// now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
- // pulled any data for > keepInSyncTimeMs ms. So it is stuck
- partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
+ partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)
}
@@ -87,7 +87,7 @@ class ISRExpirationTest extends JUnit3Su
time.sleep(10)
(partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
- val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
EasyMock.verify(log)
@@ -137,10 +137,10 @@ class ISRExpirationTest extends JUnit3Su
time.sleep(10)
(partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
- val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
- val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.keepInSyncTimeMs, configs.head.keepInSyncBytes)
+ val partition1OSR = partition1.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
assertEquals("Replica 0 should be out of sync", Set(configs.last.brokerId), partition1OSR.map(_.brokerId))
EasyMock.verify(log0)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1365199&r1=1365198&r2=1365199&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Tue Jul 24 18:13:01 2012
@@ -13,7 +13,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package kafka.server
@@ -51,7 +51,6 @@ class LeaderElectionTest extends JUnit3S
val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
servers ++= List(server1, server2)
- try {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
@@ -66,14 +65,13 @@ class LeaderElectionTest extends JUnit3S
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
// kill the server hosting the preferred replica
- servers.head.shutdown()
+ server1.shutdown()
// check if leader moves to the other server
leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
- Thread.sleep(zookeeper.tickTime)
-
+ val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
// bring the preferred replica back
servers.head.startup()
@@ -86,14 +84,9 @@ class LeaderElectionTest extends JUnit3S
// test if the leader is the preferred replica
assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
- }catch {
- case e => error("Error while running leader election test ", e)
- } finally {
- // shutdown the servers and delete data hosted on them
- servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
-
- }
+ // shutdown the servers and delete data hosted on them
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
}
// Assuming leader election happens correctly, test if epoch changes as expected
@@ -105,27 +98,23 @@ class LeaderElectionTest extends JUnit3S
// setup 2 brokers in ZK
val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
- try {
- // create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
-
- var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
- assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
- assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
-
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
- newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
- assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
- assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
-
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
- newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
- assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
- assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
-
- }finally {
- TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
- }
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+ var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+ assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+ assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
+ TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
}
}
\ No newline at end of file