You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/06 17:23:13 UTC
svn commit: r1417977 - in /kafka/branches/0.8/core/src/main/scala/kafka:
api/FetchResponse.scala api/TopicMetadataResponse.scala
server/KafkaApis.scala
Author: jkreps
Date: Thu Dec 6 16:23:12 2012
New Revision: 1417977
URL: http://svn.apache.org/viewvc?rev=1417977&view=rev
Log:
KAFKA-642 Addressing Jun's follow up comments--(1) add parans to make statement more clear, (2) remove the initial offset from the fetch response since the message set itself now contains all offsets.
Modified:
kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Thu Dec 6 16:23:12 2012
@@ -27,29 +27,25 @@ import kafka.api.ApiUtils._
object FetchResponsePartitionData {
def readFrom(buffer: ByteBuffer): FetchResponsePartitionData = {
val error = buffer.getShort
- val initialOffset = buffer.getLong
val hw = buffer.getLong
val messageSetSize = buffer.getInt
val messageSetBuffer = buffer.slice()
messageSetBuffer.limit(messageSetSize)
buffer.position(buffer.position + messageSetSize)
- new FetchResponsePartitionData(error, initialOffset,
- hw, new ByteBufferMessageSet(messageSetBuffer))
+ new FetchResponsePartitionData(error, hw, new ByteBufferMessageSet(messageSetBuffer))
}
val headerSize =
2 + /* error code */
- 8 + /* initialOffset */
8 + /* high watermark */
4 /* messageSetSize */
}
-case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError,
- initialOffset:Long = 0L, hw: Long = -1L, messages: MessageSet) {
+case class FetchResponsePartitionData(error: Short = ErrorMapping.NoError, hw: Long = -1L, messages: MessageSet) {
val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes
- def this(messages: MessageSet) = this(ErrorMapping.NoError, 0L, -1L, messages)
+ def this(messages: MessageSet) = this(ErrorMapping.NoError, -1L, messages)
}
@@ -63,7 +59,6 @@ class PartitionDataSend(val partitionId:
private val buffer = ByteBuffer.allocate( 4 /** partitionId **/ + FetchResponsePartitionData.headerSize)
buffer.putInt(partitionId)
buffer.putShort(partitionData.error)
- buffer.putLong(partitionData.initialOffset)
buffer.putLong(partitionData.hw)
buffer.putInt(partitionData.messages.sizeInBytes)
buffer.rewind()
Modified: kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataResponse.scala Thu Dec 6 16:23:12 2012
@@ -56,7 +56,7 @@ case class TopicMetadataResponse(version
def extractBrokers(topicMetadatas: Seq[TopicMetadata]): Map[Int, Broker] = {
val parts = topicsMetadata.flatMap(_.partitionsMetadata)
- val brokers = parts.flatMap(_.replicas) ++ parts.map(_.leader).collect{case Some(l) => l}
+ val brokers = (parts.flatMap(_.replicas)) ++ (parts.map(_.leader).collect{case Some(l) => l})
brokers.map(b => (b.id, b)).toMap
}
}
Modified: kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1417977&r1=1417976&r2=1417977&view=diff
==============================================================================
--- kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Dec 6 16:23:12 2012
@@ -77,7 +77,7 @@ class KafkaApis(val requestChannel: Requ
val apiRequest = request.requestObj.asInstanceOf[FetchRequest]
val fetchResponsePartitionData = apiRequest.requestInfo.map {
case (topicAndPartition, data) =>
- (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), 0, -1, null))
+ (topicAndPartition, FetchResponsePartitionData(ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]), -1, null))
}
val errorResponse = FetchResponse(apiRequest.versionId, apiRequest.correlationId, fetchResponsePartitionData)
requestChannel.sendResponse(new RequestChannel.Response(request, new FetchResponseSend(errorResponse)))
@@ -326,19 +326,18 @@ class KafkaApis(val requestChannel: Requ
BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(messages.sizeInBytes)
BrokerTopicStats.getBrokerAllTopicStats.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) {
- new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
+ new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
} else {
debug("Leader %d for topic %s partition %d received fetch request from follower %d"
.format(brokerId, topic, partition, fetchRequest.replicaId))
- new FetchResponsePartitionData(ErrorMapping.NoError, offset, highWatermark, messages)
+ new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, messages)
}
} catch {
case t: Throwable =>
BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
BrokerTopicStats.getBrokerAllTopicStats.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), t)
- new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]),
- offset, -1L, MessageSet.Empty)
+ new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty)
}
(TopicAndPartition(topic, partition), partitionData)
}