You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2015/01/13 07:32:44 UTC

kafka git commit: KAFKA-1841; OffsetCommitRequest API - timestamp field is not versioned; patched by Jun Rao; reviewed by Joel Koshy

Repository: kafka
Updated Branches:
  refs/heads/0.8.2 9b6744d3a -> 432d397af


KAFKA-1841; OffsetCommitRequest API - timestamp field is not versioned; patched by Jun Rao; reviewed by Joel Koshy


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/432d397a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/432d397a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/432d397a

Branch: refs/heads/0.8.2
Commit: 432d397af8a1d4467fe8041bcff8790864010a80
Parents: 9b6744d
Author: Jun Rao <ju...@gmail.com>
Authored: Mon Jan 12 22:32:31 2015 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Jan 12 22:32:31 2015 -0800

----------------------------------------------------------------------
 .../apache/kafka/common/protocol/Protocol.java  | 36 +++++++-
 .../common/requests/OffsetCommitRequest.java    | 22 ++++-
 .../scala/kafka/api/OffsetCommitRequest.scala   | 21 +++--
 .../scala/kafka/api/OffsetCommitResponse.scala  |  5 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |  4 +-
 .../kafka/common/OffsetMetadataAndError.scala   |  2 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 96 ++++++++++++++++----
 .../api/RequestResponseSerializationTest.scala  |  4 +-
 8 files changed, 151 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 7517b87..f0a262e 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -111,6 +111,16 @@ public class Protocol {
                                                                          new Field("offset",
                                                                                    INT64,
                                                                                    "Message offset to be committed."),
+                                                                         new Field("metadata",
+                                                                                   STRING,
+                                                                                   "Any associated metadata the client wants to keep."));
+
+    public static Schema OFFSET_COMMIT_REQUEST_PARTITION_V1 = new Schema(new Field("partition",
+                                                                                   INT32,
+                                                                                   "Topic partition id."),
+                                                                         new Field("offset",
+                                                                                   INT64,
+                                                                                   "Message offset to be committed."),
                                                                          new Field("timestamp",
                                                                                    INT64,
                                                                                    "Timestamp of the commit"),
@@ -125,6 +135,13 @@ public class Protocol {
                                                                                  new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V0),
                                                                                  "Partitions to commit offsets."));
 
+    public static Schema OFFSET_COMMIT_REQUEST_TOPIC_V1 = new Schema(new Field("topic",
+                                                                               STRING,
+                                                                               "Topic to commit."),
+                                                                     new Field("partitions",
+                                                                               new ArrayOf(OFFSET_COMMIT_REQUEST_PARTITION_V1),
+                                                                               "Partitions to commit offsets."));
+
     public static Schema OFFSET_COMMIT_REQUEST_V0 = new Schema(new Field("group_id",
                                                                          STRING,
                                                                          "The consumer group id."),
@@ -142,7 +159,7 @@ public class Protocol {
                                                                          STRING,
                                                                          "The consumer id assigned by the group coordinator."),
                                                                new Field("topics",
-                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V0),
+                                                                         new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V1),
                                                                          "Topics to commit offsets."));
 
     public static Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
@@ -158,9 +175,11 @@ public class Protocol {
     public static Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses",
                                                                           new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0)));
 
-    public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
     /* The response types for both V0 and V1 of OFFSET_COMMIT_REQUEST are the same. */
-    public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V0};
+    public static Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0;
+
+    public static Schema[] OFFSET_COMMIT_REQUEST = new Schema[] { OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1 };
+    public static Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] { OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1};
 
     /* Offset fetch api */
     public static Schema OFFSET_FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition",
@@ -181,6 +200,10 @@ public class Protocol {
                                                                         new ArrayOf(OFFSET_FETCH_REQUEST_TOPIC_V0),
                                                                         "Topics to fetch offsets."));
 
+    // version 0 and 1 have exactly the same wire format, but different functionality.
+    // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
+    public static Schema OFFSET_FETCH_REQUEST_V1 = OFFSET_FETCH_REQUEST_V0;
+
     public static Schema OFFSET_FETCH_RESPONSE_PARTITION_V0 = new Schema(new Field("partition",
                                                                                    INT32,
                                                                                    "Topic partition id."),
@@ -200,8 +223,11 @@ public class Protocol {
     public static Schema OFFSET_FETCH_RESPONSE_V0 = new Schema(new Field("responses",
                                                                          new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)));
 
-    public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0 };
-    public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0 };
+    /* The response types for both V0 and V1 of OFFSET_FETCH_RESPONSE are the same. */
+    public static Schema OFFSET_FETCH_RESPONSE_V1 = OFFSET_FETCH_RESPONSE_V0;
+
+    public static Schema[] OFFSET_FETCH_REQUEST = new Schema[] { OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1 };
+    public static Schema[] OFFSET_FETCH_RESPONSE = new Schema[] { OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1 };
 
     /* List offset api */
     public static Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition",

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 3ee5cba..66c0772 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -47,6 +47,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
 
     public static final int DEFAULT_GENERATION_ID = -1;
     public static final String DEFAULT_CONSUMER_ID = "";
+    public static final long DEFAULT_TIMESTAMP = -1L;
 
     private final String groupId;
     private final int generationId;
@@ -58,6 +59,11 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
         public final long timestamp;
         public final String metadata;
 
+        // for v0
+        public PartitionData(long offset, String metadata) {
+            this(offset, DEFAULT_TIMESTAMP, metadata);
+        }
+
         public PartitionData(long offset, long timestamp, String metadata) {
             this.offset = offset;
             this.timestamp = timestamp;
@@ -73,7 +79,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
     @Deprecated
     public OffsetCommitRequest(String groupId, Map<TopicPartition, PartitionData> offsetData) {
         super(new Struct(ProtoUtils.requestSchema(ApiKeys.OFFSET_COMMIT.id, 0)));
-        initCommonFields(groupId, offsetData);
+        initCommonFields(groupId, offsetData, 0);
         this.groupId = groupId;
         this.generationId = DEFAULT_GENERATION_ID;
         this.consumerId = DEFAULT_CONSUMER_ID;
@@ -90,7 +96,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
     public OffsetCommitRequest(String groupId, int generationId, String consumerId, Map<TopicPartition, PartitionData> offsetData) {
         super(new Struct(curSchema));
 
-        initCommonFields(groupId, offsetData);
+        initCommonFields(groupId, offsetData, 1);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(CONSUMER_ID_KEY_NAME, consumerId);
         this.groupId = groupId;
@@ -99,7 +105,7 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
         this.offsetData = offsetData;
     }
 
-    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData) {
+    private void initCommonFields(String groupId, Map<TopicPartition, PartitionData> offsetData, int versionId) {
         Map<String, Map<Integer, PartitionData>> topicsData = CollectionUtils.groupDataByTopic(offsetData);
 
         struct.set(GROUP_ID_KEY_NAME, groupId);
@@ -113,7 +119,8 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
                 partitionData.set(COMMIT_OFFSET_KEY_NAME, fetchPartitionData.offset);
-                partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
+                if (versionId == 1)
+                    partitionData.set(TIMESTAMP_KEY_NAME, fetchPartitionData.timestamp);
                 partitionData.set(METADATA_KEY_NAME, fetchPartitionData.metadata);
                 partitionArray.add(partitionData);
             }
@@ -133,7 +140,12 @@ public class OffsetCommitRequest extends AbstractRequestResponse {
                 Struct partitionResponse = (Struct) partitionResponseObj;
                 int partition = partitionResponse.getInt(PARTITION_KEY_NAME);
                 long offset = partitionResponse.getLong(COMMIT_OFFSET_KEY_NAME);
-                long timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                long timestamp;
+                // timestamp only exists in v1
+                if (partitionResponse.hasField(TIMESTAMP_KEY_NAME))
+                    timestamp = partitionResponse.getLong(TIMESTAMP_KEY_NAME);
+                else
+                    timestamp = DEFAULT_TIMESTAMP;
                 String metadata = partitionResponse.getString(METADATA_KEY_NAME);
                 PartitionData partitionData = new PartitionData(offset, timestamp, metadata);
                 offsetData.put(new TopicPartition(topic, partition), partitionData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 861a6cf..39607c7 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -30,8 +30,6 @@ object OffsetCommitRequest extends Logging {
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
-    val now = SystemTime.milliseconds
-
     // Read values from the envelope
     val versionId = buffer.getShort
     assert(versionId == 0 || versionId == 1,
@@ -59,8 +57,11 @@ object OffsetCommitRequest extends Logging {
         val partitionId = buffer.getInt
         val offset = buffer.getLong
         val timestamp = {
-          val given = buffer.getLong
-          if (given == -1L) now else given
+          if (versionId == 1) {
+            val given = buffer.getLong
+            given
+          } else
+            OffsetAndMetadata.InvalidTime
         }
         val metadata = readShortString(buffer)
         (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
@@ -68,6 +69,13 @@ object OffsetCommitRequest extends Logging {
     })
     OffsetCommitRequest(consumerGroupId, immutable.Map(pairs:_*), versionId, correlationId, clientId, groupGenerationId, consumerId)
   }
+
+  def changeInvalidTimeToCurrentTime(offsetCommitRequest: OffsetCommitRequest) {
+    val now = SystemTime.milliseconds
+    for ( (topicAndPartiiton, offsetAndMetadata) <- offsetCommitRequest.requestInfo)
+      if (offsetAndMetadata.timestamp == OffsetAndMetadata.InvalidTime)
+        offsetAndMetadata.timestamp = now
+  }
 }
 
 case class OffsetCommitRequest(groupId: String,
@@ -121,7 +129,8 @@ case class OffsetCommitRequest(groupId: String,
       t1._2.foreach( t2 => {
         buffer.putInt(t2._1.partition)
         buffer.putLong(t2._2.offset)
-        buffer.putLong(t2._2.timestamp)
+        if (versionId == 1)
+          buffer.putLong(t2._2.timestamp)
         writeShortString(buffer, t2._2.metadata)
       })
     })
@@ -143,7 +152,7 @@ case class OffsetCommitRequest(groupId: String,
         innerCount +
         4 /* partition */ +
         8 /* offset */ +
-        8 /* timestamp */ +
+        (if (versionId == 1) 8 else 0 ) /* timestamp */ +
         shortStringLength(offsetAndMetadata._2.metadata)
       })
     })

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 624a1c1..03dd736 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -23,7 +23,7 @@ import kafka.utils.Logging
 import kafka.common.TopicAndPartition
 
 object OffsetCommitResponse extends Logging {
-  val CurrentVersion: Short = 0
+  val CurrentVersion: Short = 1
 
   def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
     val correlationId = buffer.getInt
@@ -41,6 +41,9 @@ object OffsetCommitResponse extends Logging {
   }
 }
 
+/**
+ *  Single constructor for both version 0 and 1 since they have the same format.
+ */
 case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
                                 correlationId: Int = 0)
     extends RequestOrResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index c7604b9..74ee829 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -25,7 +25,9 @@ import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.network.RequestChannel.Response
 import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
 object OffsetFetchRequest extends Logging {
-  val CurrentVersion: Short = 0
+  // version 0 and 1 have exactly the same wire format, but different functionality.
+  // version 0 will read the offsets from ZK and version 1 will read the offsets from Kafka.
+  val CurrentVersion: Short = 1
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 4cabffe..db7157d 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -19,7 +19,7 @@ package kafka.common
 
 case class OffsetAndMetadata(offset: Long,
                              metadata: String = OffsetAndMetadata.NoMetadata,
-                             timestamp: Long = -1L) {
+                             var timestamp: Long = -1L) {
   override def toString = "OffsetAndMetadata[%d,%s%s]"
                           .format(offset,
                                   if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 9a61fcb..d626b17 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -25,7 +25,7 @@ import kafka.network._
 import kafka.admin.AdminUtils
 import kafka.network.RequestChannel.Response
 import kafka.controller.KafkaController
-import kafka.utils.{SystemTime, Logging}
+import kafka.utils.{ZkUtils, ZKGroupTopicDirs, SystemTime, Logging}
 
 import scala.collection._
 
@@ -64,7 +64,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
         case RequestKeys.UpdateMetadataKey => handleUpdateMetadataRequest(request)
         case RequestKeys.ControlledShutdownKey => handleControlledShutdownRequest(request)
-        case RequestKeys.OffsetCommitKey => handleProducerOrOffsetCommitRequest(request)
+        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
         case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case RequestKeys.ConsumerMetadataKey => handleConsumerMetadataRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
@@ -77,6 +77,40 @@ class KafkaApis(val requestChannel: RequestChannel,
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
   }
 
+  def handleOffsetCommitRequest(request: RequestChannel.Request) {
+    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+    if (offsetCommitRequest.versionId == 0) {
+      // version 0 stores the offsets in ZK
+      val responseInfo = offsetCommitRequest.requestInfo.map{
+        case (topicAndPartition, metaAndError) => {
+          val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
+          try {
+            ensureTopicExists(topicAndPartition.topic)
+            if(metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
+              (topicAndPartition, ErrorMapping.OffsetMetadataTooLargeCode)
+            } else {
+              ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+                topicAndPartition.partition, metaAndError.offset.toString)
+              (topicAndPartition, ErrorMapping.NoError)
+            }
+          } catch {
+            case e: Throwable => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+          }
+        }
+      }
+      val response = new OffsetCommitResponse(responseInfo, offsetCommitRequest.correlationId)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    } else {
+      // version 1 and above store the offsets in a special Kafka topic
+      handleProducerOrOffsetCommitRequest(request)
+    }
+  }
+
+  private def ensureTopicExists(topic: String) = {
+    if (metadataCache.getTopicMetadata(Set(topic)).size <= 0)
+      throw new UnknownTopicOrPartitionException("Topic " + topic + " either doesn't exist or is in the process of being deleted")
+  }
+
   def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
     // ensureTopicExists is only for client facing requests
     // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
@@ -154,6 +188,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     val (produceRequest, offsetCommitRequestOpt) =
       if (request.requestId == RequestKeys.OffsetCommitKey) {
         val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+        OffsetCommitRequest.changeInvalidTimeToCurrentTime(offsetCommitRequest)
         (producerRequestFromOffsetCommit(offsetCommitRequest), Some(offsetCommitRequest))
       } else {
         (request.requestObj.asInstanceOf[ProducerRequest], None)
@@ -504,22 +539,47 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
     val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
 
-    val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
-      metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
-    )
-    val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
-    val knownStatus =
-      if (knownTopicPartitions.size > 0)
-        offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
-      else
-        Map.empty[TopicAndPartition, OffsetMetadataAndError]
-    val status = unknownStatus ++ knownStatus
-
-    val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
-
-    trace("Sending offset fetch response %s for correlation id %d to client %s."
-          .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
-    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    if (offsetFetchRequest.versionId == 0) {
+      // version 0 reads offsets from ZK
+      val responseInfo = offsetFetchRequest.requestInfo.map( t => {
+        val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
+        try {
+          ensureTopicExists(t.topic)
+          val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
+          payloadOpt match {
+            case Some(payload) => {
+              (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
+            }
+            case None => (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
+              ErrorMapping.UnknownTopicOrPartitionCode))
+          }
+        } catch {
+          case e: Throwable =>
+            (t, OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata,
+              ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+        }
+      })
+      val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), offsetFetchRequest.correlationId)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    } else {
+      // version 1 reads offsets from Kafka
+      val (unknownTopicPartitions, knownTopicPartitions) = offsetFetchRequest.requestInfo.partition(topicAndPartition =>
+        metadataCache.getPartitionInfo(topicAndPartition.topic, topicAndPartition.partition).isEmpty
+      )
+      val unknownStatus = unknownTopicPartitions.map(topicAndPartition => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)).toMap
+      val knownStatus =
+        if (knownTopicPartitions.size > 0)
+          offsetManager.getOffsets(offsetFetchRequest.groupId, knownTopicPartitions).toMap
+        else
+          Map.empty[TopicAndPartition, OffsetMetadataAndError]
+      val status = unknownStatus ++ knownStatus
+
+      val response = OffsetFetchResponse(status, offsetFetchRequest.correlationId)
+
+      trace("Sending offset fetch response %s for correlation id %d to client %s."
+            .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
+      requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+    }
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/432d397a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index cd16ced..4e817a2 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -163,8 +163,8 @@ object SerializationTestUtils {
       versionId = 0,
       groupId = "group 1",
       requestInfo = collection.immutable.Map(
-        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata", timestamp=SystemTime.milliseconds),
-        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata, timestamp=SystemTime.milliseconds)
+        TopicAndPartition(topic1, 0) -> OffsetAndMetadata(offset=42L, metadata="some metadata"),
+        TopicAndPartition(topic1, 1) -> OffsetAndMetadata(offset=100L, metadata=OffsetAndMetadata.NoMetadata)
       ))
   }