You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2012/09/17 22:16:00 UTC
svn commit: r1386806 [2/2] - in /incubator/kafka/branches/0.8:
contrib/hadoop-consumer/src/main/java/kafka/etl/impl/
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/common/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 17 20:15:59 2012
@@ -20,7 +20,6 @@ package kafka.server
import java.io.IOException
import kafka.admin.{CreateTopicCommand, AdminUtils}
import kafka.api._
-import kafka.common._
import kafka.message._
import kafka.network._
import kafka.utils.{TopicNameValidator, Pool, SystemTime, Logging}
@@ -32,6 +31,7 @@ import kafka.network.RequestChannel.Resp
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.I0Itec.zkclient.ZkClient
+import kafka.common._
/**
* Logic to handle the various Kafka requests
@@ -40,13 +40,14 @@ class KafkaApis(val requestChannel: Requ
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
brokerId: Int) extends Logging {
- private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
- private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
+
+ private val producerRequestPurgatory = new ProducerRequestPurgatory
+ private val fetchRequestPurgatory = new FetchRequestPurgatory(requestChannel)
private val delayedRequestMetrics = new DelayedRequestMetrics
private val topicNameValidator = new TopicNameValidator(replicaManager.config.maxTopicNameLength)
private val requestLogger = Logger.getLogger("kafka.request.logger")
- this.logIdent = "[KafkaApi on Broker " + brokerId + "], "
+ this.logIdent = "[KafkaApi-%d] ".format(brokerId)
/**
* Top-level method that handles all requests and multiplexes to the right api
@@ -93,18 +94,18 @@ class KafkaApis(val requestChannel: Requ
}
/**
- * Check if the partitionDataArray from a produce request can unblock any
+ * Check if a partitionData from a produce request can unblock any
* DelayedFetch requests.
*/
- def maybeUnblockDelayedFetchRequests(topic: String, partitionDatas: Array[PartitionData]) {
- var satisfied = new mutable.ArrayBuffer[DelayedFetch]
- for(partitionData <- partitionDatas)
- satisfied ++= fetchRequestPurgatory.update(RequestKey(topic, partitionData.partition), null)
- trace("Producer request to %s unblocked %d fetch requests.".format(topic, satisfied.size))
+ def maybeUnblockDelayedFetchRequests(topic: String, partitionData: PartitionData) {
+ val partition = partitionData.partition
+ val satisfied = fetchRequestPurgatory.update(RequestKey(topic, partition), null)
+ trace("Producer request to (%s-%d) unblocked %d fetch requests.".format(topic, partition, satisfied.size))
+
// send any newly unblocked responses
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
- val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
+ val response = FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
}
}
@@ -119,28 +120,25 @@ class KafkaApis(val requestChannel: Requ
requestLogger.trace("Handling producer request " + request.toString)
trace("Handling producer request " + request.toString)
- val response = produceToLocalLog(produceRequest)
+ val localProduceResponse = produceToLocalLog(produceRequest)
debug("Produce to local log in %d ms".format(SystemTime.milliseconds - sTime))
- val partitionsInError = response.errors.count(_ != ErrorMapping.NoError)
-
- for (topicData <- produceRequest.data)
- maybeUnblockDelayedFetchRequests(topicData.topic, topicData.partitionDataArray)
-
- if (produceRequest.requiredAcks == 0 || produceRequest.requiredAcks == 1 ||
- produceRequest.data.size <= 0 || partitionsInError == response.errors.size)
- requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+
+ val numPartitionsInError = localProduceResponse.status.count(_._2.error != ErrorMapping.NoError)
+ produceRequest.data.foreach(partitionAndData =>
+ maybeUnblockDelayedFetchRequests(partitionAndData._1.topic, partitionAndData._2))
+
+ if (produceRequest.requiredAcks == 0 ||
+ produceRequest.requiredAcks == 1 ||
+ produceRequest.numPartitions <= 0 ||
+ numPartitionsInError == produceRequest.numPartitions)
+ requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(localProduceResponse)))
else {
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val producerRequestKeys = produceRequest.data.flatMap(topicData => {
- val topic = topicData.topic
- topicData.partitionDataArray.map(partitionData => {
- RequestKey(topic, partitionData.partition)
- })
- })
+ val producerRequestKeys = produceRequest.data.keys.map(
+ topicAndPartition => new RequestKey(topicAndPartition)).toSeq
val delayedProduce = new DelayedProduce(
- producerRequestKeys, request,
- response.errors, response.offsets,
+ producerRequestKeys, request, localProduceResponse,
produceRequest, produceRequest.ackTimeoutMs.toLong)
producerRequestPurgatory.watch(delayedProduce)
@@ -164,43 +162,41 @@ class KafkaApis(val requestChannel: Requ
*/
private def produceToLocalLog(request: ProducerRequest): ProducerResponse = {
trace("Produce [%s] to local log ".format(request.toString))
- val requestSize = request.topicPartitionCount
- val errors = new Array[Short](requestSize)
- val offsets = new Array[Long](requestSize)
-
- var msgIndex = -1
- for(topicData <- request.data) {
- for(partitionData <- topicData.partitionDataArray) {
- msgIndex += 1
- BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
- try {
- val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
- val log = localReplica.log.get
- log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
- // we may need to increment high watermark since ISR could be down to 1
- localReplica.partition.maybeIncrementLeaderHW(localReplica)
- offsets(msgIndex) = log.logEndOffset
- errors(msgIndex) = ErrorMapping.NoError.toShort
- trace("%d bytes written to logs, nextAppendOffset = %d"
- .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
- } catch {
- case e =>
- BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark()
- BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
- error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
- e match {
- case _: IOException =>
- fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
- System.exit(1)
- case _ =>
- errors(msgIndex) = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort
- offsets(msgIndex) = -1
- }
- }
+
+ val localErrorsAndOffsets = request.data.map (topicAndPartitionData => {
+ val (topic, partitionData) = (topicAndPartitionData._1.topic, topicAndPartitionData._2)
+ BrokerTopicStat.getBrokerTopicStat(topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
+
+ try {
+ val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partitionData.partition)
+ val log = localReplica.log.get
+ log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
+ // we may need to increment high watermark since ISR could be down to 1
+ localReplica.partition.maybeIncrementLeaderHW(localReplica)
+ val responseStatus = ProducerResponseStatus(ErrorMapping.NoError, log.logEndOffset)
+ trace("%d bytes written to logs, nextAppendOffset = %d"
+ .format(partitionData.messages.sizeInBytes, responseStatus.nextOffset))
+ (TopicAndPartition(topic, partitionData.partition), responseStatus)
+ } catch {
+ case e: Throwable =>
+ BrokerTopicStat.getBrokerTopicStat(topic).failedProduceRequestRate.mark()
+ BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
+ error("Error processing ProducerRequest on %s:%d".format(topic, partitionData.partition), e)
+ e match {
+ case _: IOException =>
+ fatal("Halting due to unrecoverable I/O error while handling producer request: " + e.getMessage, e)
+ // compiler requires scala.sys.exit (not System.exit).
+ exit(1)
+ case _ =>
+ val (error, offset) = (ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1L)
+ (TopicAndPartition(topic, partitionData.partition), ProducerResponseStatus(error, offset))
+ }
}
}
- new ProducerResponse(request.versionId, request.correlationId, errors, offsets)
+ )
+
+ ProducerResponse(request.versionId, request.correlationId, localErrorsAndOffsets)
}
/**
@@ -212,27 +208,17 @@ class KafkaApis(val requestChannel: Requ
requestLogger.trace("Handling fetch request " + fetchRequest.toString)
trace("Handling fetch request " + fetchRequest.toString)
- // validate the request
- try {
- fetchRequest.validate()
- } catch {
- case e:FetchRequestFormatException =>
- val response = new FetchResponse(fetchRequest.versionId, fetchRequest.correlationId, Array.empty)
- val channelResponse = new RequestChannel.Response(request, new FetchResponseSend(response))
- requestChannel.sendResponse(channelResponse)
- }
-
if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHW(fetchRequest)
// after updating HW, some delayed produce requests may be unblocked
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
- fetchRequest.offsetInfo.foreach(topicOffsetInfo => {
- topicOffsetInfo.partitions.foreach(partition => {
- val key = RequestKey(topicOffsetInfo.topic, partition)
+ fetchRequest.requestInfo.foreach {
+ case (topicAndPartition, _) =>
+ val key = new RequestKey(topicAndPartition)
satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
- })
- })
- debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+ }
+ debug("Replica %d fetch unblocked %d producer requests."
+ .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
satisfiedProduceRequests.foreach(_.respond())
}
@@ -243,13 +229,13 @@ class KafkaApis(val requestChannel: Requ
fetchRequest.numPartitions <= 0) {
val topicData = readMessageSets(fetchRequest)
debug("Returning fetch response %s for fetch request with correlation id %d".format(
- topicData.map(_.partitionDataArray.map(_.error).mkString(",")).mkString(","), fetchRequest.correlationId))
- val response = new FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
+ topicData.values.map(_.error).mkString(","), fetchRequest.correlationId))
+ val response = FetchResponse(FetchRequest.CurrentVersion, fetchRequest.correlationId, topicData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(response)))
} else {
debug("Putting fetch request into purgatory")
// create a list of (topic, partition) pairs to use as keys for this delayed request
- val delayedFetchKeys = fetchRequest.offsetInfo.flatMap(o => o.partitions.map(RequestKey(o.topic, _)))
+ val delayedFetchKeys = fetchRequest.requestInfo.keys.toSeq.map(new RequestKey(_))
val delayedFetch = new DelayedFetch(delayedFetchKeys, request, fetchRequest, fetchRequest.maxWait)
fetchRequestPurgatory.watch(delayedFetch)
}
@@ -259,86 +245,77 @@ class KafkaApis(val requestChannel: Requ
* Calculate the number of available bytes for the given fetch request
*/
private def availableFetchBytes(fetchRequest: FetchRequest): Long = {
- var totalBytes = 0L
- for(offsetDetail <- fetchRequest.offsetInfo) {
- for(i <- 0 until offsetDetail.partitions.size) {
- debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
+ val totalBytes = fetchRequest.requestInfo.foldLeft(0L)((folded, curr) => {
+ folded +
+ {
+ val (topic, partition) = (curr._1.topic, curr._1.partition)
+ val (offset, fetchSize) = (curr._2.offset, curr._2.fetchSize)
+ debug("Fetching log for topic %s partition %d".format(topic, partition))
try {
- val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
+ val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
val end = if (!fetchRequest.isFromFollower) {
leader.highWatermark
} else {
leader.logEndOffset
}
- val available = max(0, end - offsetDetail.offsets(i))
- totalBytes += math.min(offsetDetail.fetchSizes(i), available)
+ val available = max(0, end - offset)
+ math.min(fetchSize, available)
} catch {
case e: UnknownTopicOrPartitionException =>
- info("Invalid partition %d in fetch request from client %d."
- .format(offsetDetail.partitions(i), fetchRequest.clientId))
+ info("Invalid partition %d in fetch request from client %s."
+ .format(partition, fetchRequest.clientId))
+ 0
case e =>
error("Error determining available fetch bytes for topic %s partition %s on broker %s for client %s"
- .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId), e)
+ .format(topic, partition, brokerId, fetchRequest.clientId), e)
+ 0
}
}
- }
+ })
trace(totalBytes + " available bytes for fetch request.")
totalBytes
}
private def maybeUpdatePartitionHW(fetchRequest: FetchRequest) {
- val offsets = fetchRequest.offsetInfo
- debug("Act on update partition HW, check offset detail: %s ".format(offsets))
- for(offsetDetail <- offsets) {
- val topic = offsetDetail.topic
- val (partitions, offsets) = (offsetDetail.partitions, offsetDetail.offsets)
- for( (partition, offset) <- (partitions, offsets).zipped.map((_,_))) {
- replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
- }
- }
+ debug("Maybe update partition HW due to fetch request: %s ".format(fetchRequest))
+ fetchRequest.requestInfo.foreach(info => {
+ val (topic, partition, offset) = (info._1.topic, info._1.partition, info._2.offset)
+ replicaManager.recordFollowerPosition(topic, partition, fetchRequest.replicaId, offset)
+ })
}
/**
- * Read from all the offset details given and produce an array of topic datas
- */
- private def readMessageSets(fetchRequest: FetchRequest): Array[TopicData] = {
- val offsets = fetchRequest.offsetInfo
- val fetchedData = new mutable.ArrayBuffer[TopicData]()
-
- for(offsetDetail <- offsets) {
- val info = new mutable.ArrayBuffer[PartitionData]()
- val topic = offsetDetail.topic
- val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
- for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
- val isFetchFromFollower = fetchRequest.isFromFollower()
- val partitionInfo =
- try {
- val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
- BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
- if (!isFetchFromFollower) {
- new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
- } else {
- debug("Leader %d for topic %s partition %d received fetch request from follower %d"
- .format(brokerId, topic, partition, fetchRequest.replicaId))
- debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
- .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
- new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
- }
- }
- catch {
- case e =>
- BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
- BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
- error("error when processing request " + (topic, partition, offset, fetchSize), e)
- new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
- offset, -1L, MessageSet.Empty)
+ * Read from all the offset details given and return a map of
+ * (topic, partition) -> PartitionData
+ */
+ private def readMessageSets(fetchRequest: FetchRequest) = {
+ val isFetchFromFollower = fetchRequest.isFromFollower
+ fetchRequest.requestInfo.map {
+ case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+ val partitionData = try {
+ val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
+ BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
+ if (!isFetchFromFollower) {
+ new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+ } else {
+ debug("Leader %d for topic %s partition %d received fetch request from follower %d"
+ .format(brokerId, topic, partition, fetchRequest.replicaId))
+ debug("Leader %d returning %d messages for topic %s partition %d to follower %d"
+ .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+ new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
}
- info.append(partitionInfo)
- }
- fetchedData.append(new TopicData(topic, info.toArray))
+ }
+ catch {
+ case t: Throwable =>
+ BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
+ BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
+ error("error when processing request " + (topic, partition, offset, fetchSize), t)
+ new PartitionData(partition, ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
+ offset, -1L, MessageSet.Empty)
+ }
+ (TopicAndPartition(topic, partition), partitionData)
}
- fetchedData.toArray
}
/**
@@ -454,8 +431,14 @@ class KafkaApis(val requestChannel: Requ
private [kafka] case class RequestKey(topic: String, partition: Int)
extends MetricKey {
+
+ def this(topicAndPartition: TopicAndPartition) = this(topicAndPartition.topic, topicAndPartition.partition)
+
+ def topicAndPartition = TopicAndPartition(topic, partition)
+
override def keyLabel = "%s-%d".format(topic, partition)
}
+
/**
* A delayed fetch request
*/
@@ -465,9 +448,9 @@ class KafkaApis(val requestChannel: Requ
/**
* A holding pen for fetch requests waiting to be satisfied
*/
- class FetchRequestPurgatory(brokerId: Int, requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
+ class FetchRequestPurgatory(requestChannel: RequestChannel) extends RequestPurgatory[DelayedFetch, Null](brokerId) {
- this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
+ this.logIdent = "[FetchRequestPurgatory-%d] ".format(brokerId)
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
@@ -480,7 +463,7 @@ class KafkaApis(val requestChannel: Requ
*/
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch)
- val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
+ val response = FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
val fromFollower = delayed.fetch.isFromFollower
delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
@@ -489,48 +472,43 @@ class KafkaApis(val requestChannel: Requ
class DelayedProduce(keys: Seq[RequestKey],
request: RequestChannel.Request,
- localErrors: Array[Short],
- requiredOffsets: Array[Long],
+ localProduceResponse: ProducerResponse,
val produce: ProducerRequest,
delayMs: Long)
extends DelayedRequest(keys, request, delayMs) with Logging {
+ private val initialErrorsAndOffsets = localProduceResponse.status
/**
* Map of (topic, partition) -> partition status
* The values in this map don't need to be synchronized since updates to the
* values are effectively synchronized by the ProducerRequestPurgatory's
* update method
*/
- private [kafka] val partitionStatus = keys.map(key => {
- val keyIndex = keys.indexOf(key)
+ private [kafka] val partitionStatus = keys.map(requestKey => {
+ val producerResponseStatus = initialErrorsAndOffsets(TopicAndPartition(requestKey.topic, requestKey.partition))
// if there was an error in writing to the local replica's log, then don't
// wait for acks on this partition
- val acksPending =
- if (localErrors(keyIndex) == ErrorMapping.NoError) {
+ val (acksPending, error, nextOffset) =
+ if (producerResponseStatus.error == ErrorMapping.NoError) {
// Timeout error state will be cleared when requiredAcks are received
- localErrors(keyIndex) = ErrorMapping.RequestTimedOutCode
- true
+ (true, ErrorMapping.RequestTimedOutCode, producerResponseStatus.nextOffset)
}
- else
- false
+ else (false, producerResponseStatus.error, producerResponseStatus.nextOffset)
- val initialStatus = new PartitionStatus(acksPending, localErrors(keyIndex), requiredOffsets(keyIndex))
- trace("Initial partition status for %s = %s".format(key, initialStatus))
- (key, initialStatus)
+ val initialStatus = PartitionStatus(acksPending, error, nextOffset)
+ trace("Initial partition status for %s = %s".format(requestKey.keyLabel, initialStatus))
+ (requestKey, initialStatus)
}).toMap
-
def respond() {
- val errorsAndOffsets: (List[Short], List[Long]) = (
- keys.foldRight
- ((List[Short](), List[Long]()))
- ((key: RequestKey, result: (List[Short], List[Long])) => {
- val status = partitionStatus(key)
- (status.error :: result._1, status.requiredOffset :: result._2)
- })
- )
- val response = new ProducerResponse(produce.versionId, produce.correlationId,
- errorsAndOffsets._1.toArray, errorsAndOffsets._2.toArray)
+
+ val finalErrorsAndOffsets = initialErrorsAndOffsets.map(
+ status => {
+ val pstat = partitionStatus(new RequestKey(status._1))
+ (status._1, ProducerResponseStatus(pstat.error, pstat.requiredOffset))
+ })
+
+ val response = ProducerResponse(produce.versionId, produce.correlationId, finalErrorsAndOffsets)
requestChannel.sendResponse(new RequestChannel.Response(
request, new BoundedByteBufferSend(response)))
@@ -565,9 +543,8 @@ class KafkaApis(val requestChannel: Requ
fetchPartitionStatus.error = ErrorMapping.NoError
}
if (!fetchPartitionStatus.acksPending) {
- val topicData = produce.data.find(_.topic == topic).get
- val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
- maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
+ val partitionData = produce.data(followerFetchRequestKey.topicAndPartition)
+ maybeUnblockDelayedFetchRequests(topic, partitionData)
}
}
@@ -576,9 +553,9 @@ class KafkaApis(val requestChannel: Requ
satisfied
}
- class PartitionStatus(var acksPending: Boolean,
+ case class PartitionStatus(var acksPending: Boolean,
var error: Short,
- val requiredOffset: Long) {
+ requiredOffset: Long) {
def setThisBrokerNotLeader() {
error = ErrorMapping.NotLeaderForPartitionCode
acksPending = false
@@ -594,9 +571,9 @@ class KafkaApis(val requestChannel: Requ
/**
* A holding pen for produce requests waiting to be satisfied.
*/
- private [kafka] class ProducerRequestPurgatory(brokerId: Int) extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
+ private [kafka] class ProducerRequestPurgatory extends RequestPurgatory[DelayedProduce, RequestKey](brokerId) {
- this.logIdent = "[ProducerRequestPurgatory-%d], ".format(brokerId)
+ this.logIdent = "[ProducerRequestPurgatory-%d] ".format(brokerId)
protected def checkSatisfied(followerFetchRequestKey: RequestKey,
delayedProduce: DelayedProduce) =
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Mon Sep 17 20:15:59 2012
@@ -23,7 +23,7 @@ class ReplicaFetcherManager(private val
extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
- new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+ new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d-on-broker-%d-".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
}
def shutdown() {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala Mon Sep 17 20:15:59 2012
@@ -20,6 +20,8 @@ package kafka.server
import kafka.api.{OffsetRequest, PartitionData}
import kafka.cluster.Broker
import kafka.message.ByteBufferMessageSet
+import kafka.common.TopicAndPartition
+
class ReplicaFetcherThread(name:String, sourceBroker: Broker, brokerConfig: KafkaConfig, replicaMgr: ReplicaManager)
extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = brokerConfig.replicaSocketTimeoutMs,
@@ -56,7 +58,7 @@ class ReplicaFetcherThread(name:String,
}
// any logic for partitions whose leader has changed
- def handlePartitionsWithErrors(partitions: Iterable[(String, Int)]) {
+ def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) {
// no handler needed since the controller will make the changes accordingly
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Sep 17 20:15:59 2012
@@ -367,7 +367,7 @@ object Utils extends Logging {
/**
* Read an unsigned integer from the given position without modifying the buffers
* position
- * @param The buffer to read from
+ * @param buffer the buffer to read from
* @param index the index from which to read the integer
* @return The integer read, as a long to avoid signedness
*/
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -72,7 +72,7 @@ class LazyInitProducerTest extends JUnit
// send an invalid offset
try {
val fetchedWithError = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, -1, 10000).build())
- fetchedWithError.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+ fetchedWithError.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected an OffsetOutOfRangeException exception to be thrown")
} catch {
case e: OffsetOutOfRangeException =>
@@ -101,23 +101,22 @@ class LazyInitProducerTest extends JUnit
}
}
- {
- // send some invalid offsets
- val builder = new FetchRequestBuilder()
- for( (topic, offset) <- topicOffsets )
- builder.addFetch(topic, offset, -1, 10000)
+ // send some invalid offsets
+ val builder = new FetchRequestBuilder()
+ for( (topic, offset) <- topicOffsets )
+ builder.addFetch(topic, offset, -1, 10000)
+
+ val request = builder.build()
+ val responses = consumer.fetch(request)
+ responses.data.values.foreach(pd => {
+ try {
+ ErrorMapping.maybeThrowException(pd.error)
+ fail("Expected an OffsetOutOfRangeException exception to be thrown")
+ } catch {
+ case e: OffsetOutOfRangeException =>
- val request = builder.build()
- val responses = consumer.fetch(request)
- for(topicData <- responses.data) {
- try {
- topicData.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
- fail("Expected an OffsetOutOfRangeException exception to be thrown")
- } catch {
- case e: OffsetOutOfRangeException =>
- }
}
- }
+ })
}
def testMultiProduce() {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Mon Sep 17 20:15:59 2012
@@ -19,7 +19,7 @@ package kafka.integration
import java.nio.ByteBuffer
import junit.framework.Assert._
-import kafka.api.{OffsetDetail, FetchRequest, FetchRequestBuilder}
+import kafka.api.{PartitionFetchInfo, FetchRequest, FetchRequestBuilder}
import kafka.server.{KafkaRequestHandler, KafkaConfig}
import java.util.Properties
import kafka.producer.{ProducerData, Producer, ProducerConfig}
@@ -31,8 +31,8 @@ import org.I0Itec.zkclient.ZkClient
import kafka.zk.ZooKeeperTestHarness
import org.scalatest.junit.JUnit3Suite
import scala.collection._
-import kafka.common.{ErrorMapping, UnknownTopicOrPartitionException, FetchRequestFormatException, OffsetOutOfRangeException}
import kafka.admin.{AdminUtils, CreateTopicCommand}
+import kafka.common.{TopicAndPartition, ErrorMapping, UnknownTopicOrPartitionException, OffsetOutOfRangeException}
/**
* End to end tests of the primitive apis against a local server
@@ -77,27 +77,11 @@ class PrimitiveApiTest extends JUnit3Sui
assertEquals(request, deserializedRequest)
}
- def testFetchRequestEnforcesUniqueTopicsForOffsetDetails() {
- val offsets = Array(
- new OffsetDetail("topic1", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
- new OffsetDetail("topic2", Array(0, 1, 2), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
- new OffsetDetail("topic1", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000)),
- new OffsetDetail("topic2", Array(3, 4, 5), Array(0L, 0L, 0L), Array(1000, 1000, 1000))
- )
- val request = new FetchRequest(offsetInfo = offsets)
- try {
- consumer.fetch(request)
- fail("FetchRequest should throw FetchRequestFormatException due to duplicate topics")
- } catch {
- case e: FetchRequestFormatException => "success"
- }
- }
-
def testEmptyFetchRequest() {
- val offsets = Array[OffsetDetail]()
- val request = new FetchRequest(offsetInfo = offsets)
+ val partitionRequests = immutable.Map[TopicAndPartition, PartitionFetchInfo]()
+ val request = new FetchRequest(requestInfo = partitionRequests)
val fetched = consumer.fetch(request)
- assertTrue(fetched.errorCode == ErrorMapping.NoError && fetched.data.size == 0)
+ assertTrue(!fetched.hasError && fetched.data.size == 0)
}
def testDefaultEncoderProducerAndFetch() {
@@ -189,7 +173,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+ response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
@@ -205,7 +189,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+ response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: UnknownTopicOrPartitionException => "this is good"
@@ -253,7 +237,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+ response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid offset")
} catch {
case e: OffsetOutOfRangeException => "this is good"
@@ -269,7 +253,7 @@ class PrimitiveApiTest extends JUnit3Sui
try {
val request = builder.build()
val response = consumer.fetch(request)
- response.data.foreach(_.partitionDataArray.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error)))
+ response.data.values.foreach(pdata => ErrorMapping.maybeThrowException(pdata.error))
fail("Expected exception when fetching message with invalid partition")
} catch {
case e: UnknownTopicOrPartitionException => "this is good"
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala Mon Sep 17 20:15:59 2012
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package kafka.network;
+package kafka.network
import org.junit._
import org.scalatest.junit.JUnitSuite
@@ -24,31 +24,45 @@ import java.nio.ByteBuffer
import kafka.api._
import kafka.message.{Message, ByteBufferMessageSet}
import kafka.cluster.Broker
-import kafka.common.ErrorMapping
import collection.mutable._
+import kafka.common.{TopicAndPartition, ErrorMapping}
+
object RpcDataSerializationTestUtils{
private val topic1 = "test1"
private val topic2 = "test2"
- private val leader1 = 0;
+ private val leader1 = 0
private val isr1 = List(0, 1, 2)
- private val leader2 = 0;
+ private val leader2 = 0
private val isr2 = List(0, 2, 3)
private val partitionData0 = new PartitionData(0, new ByteBufferMessageSet(new Message("first message".getBytes)))
private val partitionData1 = new PartitionData(1, new ByteBufferMessageSet(new Message("second message".getBytes)))
private val partitionData2 = new PartitionData(2, new ByteBufferMessageSet(new Message("third message".getBytes)))
private val partitionData3 = new PartitionData(3, new ByteBufferMessageSet(new Message("fourth message".getBytes)))
private val partitionDataArray = Array(partitionData0, partitionData1, partitionData2, partitionData3)
- private val topicData1 = new TopicData(topic1, partitionDataArray)
- private val topicData2 = new TopicData(topic2, partitionDataArray)
- private val topicDataArray = Array(topicData1, topicData2)
- private val offsetDetail1 = new OffsetDetail(topic1, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
- private val offsetDetail2 = new OffsetDetail(topic2, Seq(0, 1, 2, 3), Seq(1000, 2000, 3000, 4000), Seq(100, 100, 100, 100))
- private val offsetDetailSeq = Seq(offsetDetail1, offsetDetail2)
- private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
- private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
- private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
- private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), Seq.empty)
+
+ private val topicData = {
+ val groupedData = Array(topic1, topic2).flatMap(topic =>
+ partitionDataArray.map(partitionData =>
+ (TopicAndPartition(topic, partitionData.partition), partitionData)))
+ collection.immutable.Map(groupedData:_*)
+ }
+
+ private val requestInfos = collection.immutable.Map(
+ TopicAndPartition(topic1, 0) -> PartitionFetchInfo(1000, 100),
+ TopicAndPartition(topic1, 1) -> PartitionFetchInfo(2000, 100),
+ TopicAndPartition(topic1, 2) -> PartitionFetchInfo(3000, 100),
+ TopicAndPartition(topic1, 3) -> PartitionFetchInfo(4000, 100),
+ TopicAndPartition(topic2, 0) -> PartitionFetchInfo(1000, 100),
+ TopicAndPartition(topic2, 1) -> PartitionFetchInfo(2000, 100),
+ TopicAndPartition(topic2, 2) -> PartitionFetchInfo(3000, 100),
+ TopicAndPartition(topic2, 3) -> PartitionFetchInfo(4000, 100)
+ )
+
+ private val partitionMetaData0 = new PartitionMetadata(0, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+ private val partitionMetaData1 = new PartitionMetadata(1, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+ private val partitionMetaData2 = new PartitionMetadata(2, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
+ private val partitionMetaData3 = new PartitionMetadata(3, Some(new Broker(0, "creator", "localhost", 1011)), collection.immutable.Seq.empty)
private val partitionMetaDataSeq = Seq(partitionMetaData0, partitionMetaData1, partitionMetaData2, partitionMetaData3)
private val topicmetaData1 = new TopicMetadata(topic1, partitionMetaDataSeq)
private val topicmetaData2 = new TopicMetadata(topic2, partitionMetaDataSeq)
@@ -78,19 +92,21 @@ object RpcDataSerializationTestUtils{
}
def createTestProducerRequest: ProducerRequest = {
- new ProducerRequest(1, "client 1", 0, 1000, topicDataArray)
+ new ProducerRequest(1, "client 1", 0, 1000, topicData)
}
- def createTestProducerResponse: ProducerResponse = {
- new ProducerResponse(1, 1, Array(0.toShort, 0.toShort), Array(1000l, 2000l), 0)
- }
+ def createTestProducerResponse: ProducerResponse =
+ ProducerResponse(1, 1, Map(
+ TopicAndPartition(topic1, 0) -> ProducerResponseStatus(0.toShort, 10001),
+ TopicAndPartition(topic2, 0) -> ProducerResponseStatus(0.toShort, 20001)
+ ))
def createTestFetchRequest: FetchRequest = {
- new FetchRequest(offsetInfo = offsetDetailSeq)
+ new FetchRequest(requestInfo = requestInfos)
}
def createTestFetchResponse: FetchResponse = {
- new FetchResponse(1, 1, topicDataArray)
+ FetchResponse(1, 1, topicData)
}
def createTestOffsetRequest: OffsetRequest = {
@@ -154,7 +170,7 @@ class RpcDataSerializationTest extends J
assertEquals("The original and deserialzed stopReplicaResponse should be the same", stopReplicaResponse,
deserializedStopReplicaResponse)
- buffer = ByteBuffer.allocate(producerRequest.sizeInBytes())
+ buffer = ByteBuffer.allocate(producerRequest.sizeInBytes)
producerRequest.writeTo(buffer)
buffer.rewind()
val deserializedProducerRequest = ProducerRequest.readFrom(buffer)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Mon Sep 17 20:15:59 2012
@@ -25,8 +25,10 @@ import kafka.utils.TestUtils
import java.util.Random
import junit.framework.Assert._
import kafka.producer.SyncProducerConfig
-import kafka.api.{TopicData, ProducerRequest}
+import kafka.api.{PartitionData, ProducerRequest}
import java.nio.ByteBuffer
+import kafka.common.TopicAndPartition
+
class SocketServerTest extends JUnitSuite {
@@ -75,9 +77,10 @@ class SocketServerTest extends JUnitSuit
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
- val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+ val emptyRequest =
+ new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
- val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes())
+ val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes)
emptyRequest.writeTo(byteBuffer)
byteBuffer.rewind()
val serializedBytes = new Array[Byte](byteBuffer.remaining)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -201,11 +201,11 @@ class AsyncProducerTest extends JUnit3Su
topic2Broker2Data.appendAll(List(new ProducerData[Int,Message]("topic2", 1, new Message("msg2".getBytes))))
val expectedResult = Some(Map(
0 -> Map(
- ("topic1", 0) -> topic1Broker1Data,
- ("topic2", 0) -> topic2Broker1Data),
+ TopicAndPartition("topic1", 0) -> topic1Broker1Data,
+ TopicAndPartition("topic2", 0) -> topic2Broker1Data),
1 -> Map(
- ("topic1", 1) -> topic1Broker2Data,
- ("topic2", 1) -> topic2Broker2Data)
+ TopicAndPartition("topic1", 1) -> topic1Broker2Data,
+ TopicAndPartition("topic2", 1) -> topic2Broker2Data)
))
val actualResult = handler.partitionAndCollate(producerDataList)
@@ -344,7 +344,7 @@ class AsyncProducerTest extends JUnit3Su
partitionedDataOpt match {
case Some(partitionedData) =>
for ((brokerId, dataPerBroker) <- partitionedData) {
- for ( ((topic, partitionId), dataPerTopic) <- dataPerBroker)
+ for ( (TopicAndPartition(topic, partitionId), dataPerTopic) <- dataPerBroker)
assertTrue(partitionId == 0)
}
case None =>
@@ -408,10 +408,12 @@ class AsyncProducerTest extends JUnit3Su
// entirely. The second request will succeed for partition 1 but fail for partition 0.
// On the third try for partition 0, let it succeed.
val request1 = TestUtils.produceRequestWithAcks(List(topic1), List(0, 1), TestUtils.messagesToSet(msgs), 0)
- val response1 =
- new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(ErrorMapping.NotLeaderForPartitionCode.toShort, 0.toShort), Array(0L, 0L))
+ val response1 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+ Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NotLeaderForPartitionCode.toShort, 0L)),
+ (TopicAndPartition("topic1", 1), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val request2 = TestUtils.produceRequest(topic1, 0, TestUtils.messagesToSet(msgs))
- val response2 = new ProducerResponse(ProducerRequest.CurrentVersion, 0, Array(0.toShort), Array(0L))
+ val response2 = ProducerResponse(ProducerRequest.CurrentVersion, 0,
+ Map((TopicAndPartition("topic1", 0), ProducerResponseStatus(ErrorMapping.NoError, 0L))))
val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
EasyMock.expect(mockSyncProducer.send(request1)).andThrow(new RuntimeException) // simulate SocketTimeoutException
EasyMock.expect(mockSyncProducer.send(request1)).andReturn(response1)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Mon Sep 17 20:15:59 2012
@@ -21,14 +21,14 @@ import java.net.SocketTimeoutException
import java.util.Properties
import junit.framework.Assert
import kafka.admin.CreateTopicCommand
-import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
import kafka.integration.KafkaServerTestHarness
-import kafka.message.{NoCompressionCodec, DefaultCompressionCodec, Message, ByteBufferMessageSet}
+import kafka.message.{NoCompressionCodec, Message, ByteBufferMessageSet}
import kafka.server.KafkaConfig
import kafka.utils.{TestZKUtils, SystemTime, TestUtils}
import org.junit.Test
import org.scalatest.junit.JUnit3Suite
-import kafka.api.TopicData
+import kafka.api.{ProducerResponseStatus, PartitionData}
+import kafka.common.{TopicAndPartition, ErrorMapping}
class SyncProducerTest extends JUnit3Suite with KafkaServerTestHarness {
private var messageBytes = new Array[Byte](2);
@@ -85,11 +85,11 @@ class SyncProducerTest extends JUnit3Sui
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
val ack = SyncProducerConfig.DefaultRequiredAcks
- val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+ val emptyRequest = new kafka.api.ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Map[TopicAndPartition, PartitionData]())
val producer = new SyncProducer(new SyncProducerConfig(props))
val response = producer.send(emptyRequest)
- Assert.assertTrue(response.errorCode == ErrorMapping.NoError && response.errors.size == 0 && response.offsets.size == 0)
+ Assert.assertTrue(!response.hasError && response.status.size == 0)
}
@Test
@@ -109,17 +109,17 @@ class SyncProducerTest extends JUnit3Sui
val messageSet1 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message1)
val response1 = producer.send(TestUtils.produceRequest("test", 0, messageSet1))
- Assert.assertEquals(1, response1.errors.length)
- Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.errors(0))
- Assert.assertEquals(-1L, response1.offsets(0))
+ Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
+ Assert.assertEquals(ErrorMapping.MessageSizeTooLargeCode, response1.status(TopicAndPartition("test", 0)).error)
+ Assert.assertEquals(-1L, response1.status(TopicAndPartition("test", 0)).nextOffset)
val message2 = new Message(new Array[Byte](1000000))
val messageSet2 = new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = message2)
val response2 = producer.send(TestUtils.produceRequest("test", 0, messageSet2))
- Assert.assertEquals(1, response2.errors.length)
- Assert.assertEquals(ErrorMapping.NoError, response2.errors(0))
- Assert.assertEquals(messageSet2.sizeInBytes, response2.offsets(0))
+ Assert.assertEquals(1, response1.status.count(_._2.error != ErrorMapping.NoError))
+ Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("test", 0)).error)
+ Assert.assertEquals(messageSet2.sizeInBytes, response2.status(TopicAndPartition("test", 0)).nextOffset)
}
@Test
@@ -142,10 +142,12 @@ class SyncProducerTest extends JUnit3Sui
Assert.assertNotNull(response)
Assert.assertEquals(request.correlationId, response.correlationId)
- Assert.assertEquals(response.errors.length, response.offsets.length)
- Assert.assertEquals(3, response.errors.length)
- response.errors.foreach(Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, _))
- response.offsets.foreach(Assert.assertEquals(-1L, _))
+ Assert.assertEquals(3, response.status.size)
+ response.status.values.foreach {
+ case ProducerResponseStatus(error, nextOffset) =>
+ Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, error)
+ Assert.assertEquals(-1L, nextOffset)
+ }
// #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
@@ -156,18 +158,18 @@ class SyncProducerTest extends JUnit3Sui
val response2 = producer.send(request)
Assert.assertNotNull(response2)
Assert.assertEquals(request.correlationId, response2.correlationId)
- Assert.assertEquals(response2.errors.length, response2.offsets.length)
- Assert.assertEquals(3, response2.errors.length)
+ Assert.assertEquals(3, response2.status.size)
// the first and last message should have been accepted by broker
- Assert.assertEquals(0, response2.errors(0))
- Assert.assertEquals(0, response2.errors(2))
- Assert.assertEquals(messages.sizeInBytes, response2.offsets(0))
- Assert.assertEquals(messages.sizeInBytes, response2.offsets(2))
+ Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic1", 0)).error)
+ Assert.assertEquals(ErrorMapping.NoError, response2.status(TopicAndPartition("topic3", 0)).error)
+ Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic1", 0)).nextOffset)
+ Assert.assertEquals(messages.sizeInBytes, response2.status(TopicAndPartition("topic3", 0)).nextOffset)
// the middle message should have been rejected because broker doesn't lead partition
- Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort, response2.errors(1))
- Assert.assertEquals(-1, response2.offsets(1))
+ Assert.assertEquals(ErrorMapping.UnknownTopicOrPartitionCode.toShort,
+ response2.status(TopicAndPartition("topic2", 0)).error)
+ Assert.assertEquals(-1, response2.status(TopicAndPartition("topic2", 0)).nextOffset)
}
@Test
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1386806&r1=1386805&r2=1386806&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Sep 17 20:15:59 2012
@@ -34,7 +34,9 @@ import kafka.consumer.ConsumerConfig
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kafka.api._
+import collection.mutable.Map
import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
+import kafka.common.TopicAndPartition
/**
@@ -364,28 +366,10 @@ object TestUtils extends Logging {
val correlationId = SyncProducerConfig.DefaultCorrelationId
val clientId = SyncProducerConfig.DefaultClientId
val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
- val data = topics.map(new TopicData(_, partitions.map(new PartitionData(_, message)).toArray))
- new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, data.toArray)
- }
-
- def produceJavaRequest(topic: String, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
- produceJavaRequest(-1,topic,-1,message)
- }
-
- def produceJavaRequest(topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
- produceJavaRequest(-1,topic,partition,message)
- }
-
- def produceJavaRequest(correlationId: Int, topic: String, partition: Int, message: kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerRequest = {
- val clientId = "test"
- val requiredAcks: Short = 0
- val ackTimeoutMs = 0
- var data = new Array[TopicData](1)
- var partitionData = new Array[PartitionData](1)
- partitionData(0) = new PartitionData(partition,message.underlying)
- data(0) = new TopicData(topic,partitionData)
- val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
- pr
+ val data = topics.flatMap(topic =>
+ partitions.map(partition => (TopicAndPartition(topic, partition), new PartitionData(partition, message)))
+ )
+ new kafka.api.ProducerRequest(correlationId, clientId, acks.toShort, ackTimeoutMs, Map(data:_*))
}
def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {