You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/01 22:54:05 UTC
git commit: KAFKA-759 Commit/FetchOffset APIs should not return
versionId; reviewed by Neha Narkhede
Updated Branches:
refs/heads/trunk 218e6a53c -> 82b11aa0d
KAFKA-759 Commit/FetchOffset APIs should not return versionId; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82b11aa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82b11aa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82b11aa0
Branch: refs/heads/trunk
Commit: 82b11aa0d4bc32c5a351ace3a67cd2d57c9d1e8d
Parents: 218e6a5
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 1 13:54:00 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 1 13:54:00 2013 -0800
----------------------------------------------------------------------
.../scala/kafka/api/OffsetCommitResponse.scala | 6 +-----
.../main/scala/kafka/api/OffsetFetchResponse.scala | 6 +-----
core/src/main/scala/kafka/server/KafkaApis.scala | 2 --
3 files changed, 2 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 4e1313e..3b0d861 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -29,7 +29,6 @@ object OffsetCommitResponse extends Logging {
def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
// Read values from the envelope
- val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
@@ -44,12 +43,11 @@ object OffsetCommitResponse extends Logging {
(TopicAndPartition(topic, partitionId), error)
})
})
- OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ OffsetCommitResponse(Map(pairs:_*), correlationId, clientId)
}
}
case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
- versionId: Short = OffsetCommitResponse.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetCommitResponse.DefaultClientId)
extends RequestOrResponse {
@@ -58,7 +56,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
def writeTo(buffer: ByteBuffer) {
// Write envelope
- buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
@@ -75,7 +72,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
}
override def sizeInBytes =
- 2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* topic count */
http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index fb5e6cb..3d4ce2a 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -29,7 +29,6 @@ object OffsetFetchResponse extends Logging {
def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
// Read values from the envelope
- val versionId = buffer.getShort
val correlationId = buffer.getInt
val clientId = readShortString(buffer)
@@ -46,12 +45,11 @@ object OffsetFetchResponse extends Logging {
(TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
})
})
- OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId)
+ OffsetFetchResponse(Map(pairs:_*), correlationId, clientId)
}
}
case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
- versionId: Short = OffsetFetchResponse.CurrentVersion,
correlationId: Int = 0,
clientId: String = OffsetFetchResponse.DefaultClientId)
extends RequestOrResponse {
@@ -60,7 +58,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
def writeTo(buffer: ByteBuffer) {
// Write envelope
- buffer.putShort(versionId)
buffer.putInt(correlationId)
writeShortString(buffer, clientId)
@@ -79,7 +76,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
}
override def sizeInBytes =
- 2 + /* versionId */
4 + /* correlationId */
shortStringLength(clientId) +
4 + /* topic count */
http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c059981..5c5dbc9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -489,7 +489,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val response = new OffsetCommitResponse(responseInfo,
- offsetCommitRequest.versionId,
offsetCommitRequest.correlationId,
offsetCommitRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -521,7 +520,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
})
val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*),
- offsetFetchRequest.versionId,
offsetFetchRequest.correlationId,
offsetFetchRequest.clientId)
requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))