You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/03/01 22:54:05 UTC

git commit: KAFKA-759 Commit/FetchOffset APIs should not return versionId; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/trunk 218e6a53c -> 82b11aa0d


KAFKA-759 Commit/FetchOffset APIs should not return versionId; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/82b11aa0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/82b11aa0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/82b11aa0

Branch: refs/heads/trunk
Commit: 82b11aa0d4bc32c5a351ace3a67cd2d57c9d1e8d
Parents: 218e6a5
Author: Neha Narkhede <ne...@gmail.com>
Authored: Fri Mar 1 13:54:00 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Fri Mar 1 13:54:00 2013 -0800

----------------------------------------------------------------------
 .../scala/kafka/api/OffsetCommitResponse.scala     |    6 +-----
 .../main/scala/kafka/api/OffsetFetchResponse.scala |    6 +-----
 core/src/main/scala/kafka/server/KafkaApis.scala   |    2 --
 3 files changed, 2 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 4e1313e..3b0d861 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -29,7 +29,6 @@ object OffsetCommitResponse extends Logging {
 
   def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
     // Read values from the envelope
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
 
@@ -44,12 +43,11 @@ object OffsetCommitResponse extends Logging {
         (TopicAndPartition(topic, partitionId), error)
       })
     })
-    OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId)
+    OffsetCommitResponse(Map(pairs:_*), correlationId, clientId)
   }
 }
 
 case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
-                               versionId: Short = OffsetCommitResponse.CurrentVersion,
                                correlationId: Int = 0,
                                clientId: String = OffsetCommitResponse.DefaultClientId)
     extends RequestOrResponse {
@@ -58,7 +56,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
 
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
 
@@ -75,7 +72,6 @@ case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
   }
 
   override def sizeInBytes = 
-    2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
     4 + /* topic count */

http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
index fb5e6cb..3d4ce2a 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -29,7 +29,6 @@ object OffsetFetchResponse extends Logging {
 
   def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
     // Read values from the envelope
-    val versionId = buffer.getShort
     val correlationId = buffer.getInt
     val clientId = readShortString(buffer)
 
@@ -46,12 +45,11 @@ object OffsetFetchResponse extends Logging {
         (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
       })
     })
-    OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId)
+    OffsetFetchResponse(Map(pairs:_*), correlationId, clientId)
   }
 }
 
 case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
-                               versionId: Short = OffsetFetchResponse.CurrentVersion,
                                correlationId: Int = 0,
                                clientId: String = OffsetFetchResponse.DefaultClientId)
     extends RequestOrResponse {
@@ -60,7 +58,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
 
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
-    buffer.putShort(versionId)
     buffer.putInt(correlationId)
     writeShortString(buffer, clientId)
 
@@ -79,7 +76,6 @@ case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadat
   }
 
   override def sizeInBytes = 
-    2 + /* versionId */
     4 + /* correlationId */
     shortStringLength(clientId) +
     4 + /* topic count */

http://git-wip-us.apache.org/repos/asf/kafka/blob/82b11aa0/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c059981..5c5dbc9 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -489,7 +489,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     })
     val response = new OffsetCommitResponse(responseInfo, 
-                                        offsetCommitRequest.versionId, 
                                         offsetCommitRequest.correlationId,
                                         offsetCommitRequest.clientId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
@@ -521,7 +520,6 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
     })
     val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
-                                        offsetFetchRequest.versionId, 
                                         offsetFetchRequest.correlationId,
                                         offsetFetchRequest.clientId)
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))