You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/03/08 03:33:55 UTC

kafka git commit: KAFKA-2068; KAFKA-2069; Replace OffsetCommit and OffsetFetch Request/…

Repository: kafka
Updated Branches:
  refs/heads/trunk 2fed0c62d -> 8d0c298c8


KAFKA-2068; KAFKA-2069; Replace OffsetCommit and OffsetFetch Request/…

…Response with o.a.k.c.requests equivalent

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma

Closes #927 from granthenke/offset-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8d0c298c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8d0c298c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8d0c298c

Branch: refs/heads/trunk
Commit: 8d0c298c8c3283a7f8cffc4f68b3af87b0588e07
Parents: 2fed0c6
Author: Grant Henke <gr...@gmail.com>
Authored: Mon Mar 7 18:33:52 2016 -0800
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Mon Mar 7 18:33:52 2016 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/GroupCoordinator.scala    |  18 +-
 .../coordinator/GroupMetadataManager.scala      |  33 +--
 .../scala/kafka/network/RequestChannel.scala    |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 276 ++++++++++---------
 .../scala/kafka/tools/DumpLogSegments.scala     |  14 +-
 .../GroupCoordinatorResponseTest.scala          |  12 +-
 6 files changed, 181 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
index cb08358..36d7bbb 100644
--- a/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupCoordinator.scala
@@ -24,9 +24,10 @@ import kafka.log.LogConfig
 import kafka.message.UncompressedCodec
 import kafka.server._
 import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.requests.{OffsetFetchResponse, JoinGroupRequest}
 
 import scala.collection.{Map, Seq, immutable}
 
@@ -381,8 +382,8 @@ class GroupCoordinator(val brokerId: Int,
   def handleCommitOffsets(groupId: String,
                           memberId: String,
                           generationId: Int,
-                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit) {
+                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicPartition, Short] => Unit) {
     var delayedOffsetStore: Option[DelayedStore] = None
 
     if (!isActive.get) {
@@ -425,13 +426,16 @@ class GroupCoordinator(val brokerId: Int,
   }
 
   def handleFetchOffsets(groupId: String,
-                         partitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+                         partitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
     if (!isActive.get) {
-      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupCoordinatorNotAvailable)}.toMap
+      partitions.map { case topicPartition =>
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code))}.toMap
     } else if (!isCoordinatorForGroup(groupId)) {
-      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.NotCoordinatorForGroup)}.toMap
+      partitions.map { case topicPartition =>
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))}.toMap
     } else if (isCoordinatorLoadingInProgress(groupId)) {
-      partitions.map {case topicAndPartition => (topicAndPartition, OffsetMetadataAndError.GroupLoading)}.toMap
+      partitions.map { case topicPartition =>
+        (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_LOAD_IN_PROGRESS.code))}.toMap
     } else {
       // return offsets blindly regardless the current group state since the group may be using
       // Kafka commit storage without automatic group management

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 2c29172..cbdb854 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -28,6 +28,7 @@ import org.apache.kafka.common.protocol.types.Type.INT32
 import org.apache.kafka.common.protocol.types.Type.INT64
 import org.apache.kafka.common.protocol.types.Type.BYTES
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.apache.kafka.common.requests.OffsetFetchResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.clients.consumer.ConsumerRecord
@@ -243,10 +244,10 @@ class GroupMetadataManager(val brokerId: Int,
   def prepareStoreOffsets(groupId: String,
                           consumerId: String,
                           generationId: Int,
-                          offsetMetadata: immutable.Map[TopicAndPartition, OffsetAndMetadata],
-                          responseCallback: immutable.Map[TopicAndPartition, Short] => Unit): DelayedStore = {
+                          offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
+                          responseCallback: immutable.Map[TopicPartition, Short] => Unit): DelayedStore = {
     // first filter out partitions with offset metadata size exceeding limit
-    val filteredOffsetMetadata = offsetMetadata.filter { case (topicAndPartition, offsetAndMetadata) =>
+    val filteredOffsetMetadata = offsetMetadata.filter { case (topicPartition, offsetAndMetadata) =>
       validateOffsetMetadataLength(offsetAndMetadata.metadata)
     }
 
@@ -319,26 +320,26 @@ class GroupMetadataManager(val brokerId: Int,
    * The most important guarantee that this API provides is that it should never return a stale offset. i.e., it either
    * returns the current offset or it begins to sync the cache from the log (and returns an error code).
    */
-  def getOffsets(group: String, topicPartitions: Seq[TopicAndPartition]): Map[TopicAndPartition, OffsetMetadataAndError] = {
+  def getOffsets(group: String, topicPartitions: Seq[TopicPartition]): Map[TopicPartition, OffsetFetchResponse.PartitionData] = {
     trace("Getting offsets %s for group %s.".format(topicPartitions, group))
 
     if (isGroupLocal(group)) {
       if (topicPartitions.isEmpty) {
         // Return offsets for all partitions owned by this consumer group. (this only applies to consumers that commit offsets to Kafka.)
         offsetsCache.filter(_._1.group == group).map { case(groupTopicPartition, offsetAndMetadata) =>
-          (groupTopicPartition.topicPartition, OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
+          (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code))
         }.toMap
       } else {
-        topicPartitions.map { topicAndPartition =>
-          val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
+        topicPartitions.map { topicPartition =>
+          val groupTopicPartition = GroupTopicPartition(group, topicPartition)
           (groupTopicPartition.topicPartition, getOffset(groupTopicPartition))
         }.toMap
       }
     } else {
       debug("Could not fetch offsets for group %s (not offset coordinator).".format(group))
-      topicPartitions.map { topicAndPartition =>
-        val groupTopicPartition = GroupTopicPartition(group, topicAndPartition)
-        (groupTopicPartition.topicPartition, OffsetMetadataAndError.NotCoordinatorForGroup)
+      topicPartitions.map { topicPartition =>
+        val groupTopicPartition = GroupTopicPartition(group, topicPartition)
+        (groupTopicPartition.topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NOT_COORDINATOR_FOR_GROUP.code))
       }.toMap
     }
   }
@@ -517,12 +518,12 @@ class GroupMetadataManager(val brokerId: Int,
    * @param key The requested group-topic-partition
    * @return If the key is present, return the offset and metadata; otherwise return None
    */
-  private def getOffset(key: GroupTopicPartition) = {
+  private def getOffset(key: GroupTopicPartition): OffsetFetchResponse.PartitionData = {
     val offsetAndMetadata = offsetsCache.get(key)
     if (offsetAndMetadata == null)
-      OffsetMetadataAndError.NoOffset
+      new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.NONE.code)
     else
-      OffsetMetadataAndError(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
+      new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset, offsetAndMetadata.metadata, Errors.NONE.code)
   }
 
   /**
@@ -872,7 +873,7 @@ object GroupMetadataManager {
       val topic = key.get(OFFSET_KEY_TOPIC_FIELD).asInstanceOf[String]
       val partition = key.get(OFFSET_KEY_PARTITION_FIELD).asInstanceOf[Int]
 
-      OffsetKey(version, GroupTopicPartition(group, TopicAndPartition(topic, partition)))
+      OffsetKey(version, GroupTopicPartition(group, new TopicPartition(topic, partition)))
 
     } else if (version == CURRENT_GROUP_KEY_SCHEMA_VERSION) {
       // version 2 refers to offset
@@ -1009,10 +1010,10 @@ object GroupMetadataManager {
 
 }
 
-case class GroupTopicPartition(group: String, topicPartition: TopicAndPartition) {
+case class GroupTopicPartition(group: String, topicPartition: TopicPartition) {
 
   def this(group: String, topic: String, partition: Int) =
-    this(group, new TopicAndPartition(topic, partition))
+    this(group, new TopicPartition(topic, partition))
 
   override def toString =
     "[%s,%s,%d]".format(group, topicPartition.topic, topicPartition.partition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 219e2fb..916c438 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -64,9 +64,7 @@ object RequestChannel extends Logging {
     private val keyToNameAndDeserializerMap: Map[Short, (ByteBuffer) => RequestOrResponse]=
       Map(ApiKeys.FETCH.id -> FetchRequest.readFrom,
         ApiKeys.METADATA.id -> TopicMetadataRequest.readFrom,
-        ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom,
-        ApiKeys.OFFSET_COMMIT.id -> OffsetCommitRequest.readFrom,
-        ApiKeys.OFFSET_FETCH.id -> OffsetFetchRequest.readFrom
+        ApiKeys.CONTROLLED_SHUTDOWN_KEY.id -> ControlledShutdownRequest.readFrom
       )
 
     // TODO: this will be removed once we migrated to client-side format

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 2a289b4..8f3a2ad 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -39,7 +39,8 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse,
 DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse,
 LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse,
-StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse}
+StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse,
+OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{TopicPartition, Node}
@@ -209,107 +210,108 @@ class KafkaApis(val requestChannel: RequestChannel,
    * Handle an offset commit request
    */
   def handleOffsetCommitRequest(request: RequestChannel.Request) {
-    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+    val header = request.header
+    val offsetCommitRequest = request.body.asInstanceOf[OffsetCommitRequest]
 
-    // reject the request immediately if not authorized to the group
+    // reject the request if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetCommitRequest.groupId))) {
-      val errors = offsetCommitRequest.requestInfo.mapValues(_ => Errors.GROUP_AUTHORIZATION_FAILED.code)
-      val response = OffsetCommitResponse(errors, offsetCommitRequest.correlationId)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
-      return
-    }
-
-    // filter non-exist topics
-    val invalidRequestsInfo = offsetCommitRequest.requestInfo.filter { case (topicAndPartition, offsetMetadata) =>
-      !metadataCache.contains(topicAndPartition.topic)
-    }
-    val filteredRequestInfo = (offsetCommitRequest.requestInfo -- invalidRequestsInfo.keys)
+      val errorCode = new JShort(Errors.GROUP_AUTHORIZATION_FAILED.code)
+      val results = offsetCommitRequest.offsetData.keySet.asScala.map { topicPartition =>
+        (topicPartition, errorCode)
+      }.toMap
+      val responseHeader = new ResponseHeader(header.correlationId)
+      val responseBody = new OffsetCommitResponse(results.asJava)
+      requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
+    } else {
+      // filter non-existent topics
+      val invalidRequestsInfo = offsetCommitRequest.offsetData.asScala.filter { case (topicPartition, _) =>
+        !metadataCache.contains(topicPartition.topic)
+      }
+      val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys
 
-    val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
-      case (topicAndPartition, offsetMetadata) =>
-        authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic))
-    }
+      val (authorizedRequestInfo, unauthorizedRequestInfo) =  filteredRequestInfo.partition {
+        case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic))
+      }
 
-    // the callback for sending an offset commit response
-    def sendResponseCallback(commitStatus: immutable.Map[TopicAndPartition, Short]) {
-      val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      // the callback for sending an offset commit response
+      def sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Short]) {
+        val mergedCommitStatus = commitStatus ++ unauthorizedRequestInfo.mapValues(_ => Errors.TOPIC_AUTHORIZATION_FAILED.code)
 
-      mergedCommitStatus.foreach { case (topicAndPartition, errorCode) =>
-        if (errorCode != Errors.NONE.code) {
-          debug("Offset commit request with correlation id %d from client %s on partition %s failed due to %s"
-            .format(offsetCommitRequest.correlationId, offsetCommitRequest.clientId,
-              topicAndPartition, Errors.forCode(errorCode).exceptionName))
+        mergedCommitStatus.foreach { case (topicPartition, errorCode) =>
+          if (errorCode != Errors.NONE.code) {
+            debug(s"Offset commit request with correlation id ${header.correlationId} from client ${header.clientId} " +
+              s"on partition $topicPartition failed due to ${Errors.forCode(errorCode).exceptionName}")
+          }
         }
+        val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code))
+
+        val responseHeader = new ResponseHeader(header.correlationId)
+        val responseBody =  new OffsetCommitResponse(combinedCommitStatus.asJava)
+        requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody)))
       }
-      val combinedCommitStatus = mergedCommitStatus ++ invalidRequestsInfo.map(_._1 -> Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-      val response = OffsetCommitResponse(combinedCommitStatus, offsetCommitRequest.correlationId)
-      requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
-    }
 
-    if (authorizedRequestInfo.isEmpty)
-      sendResponseCallback(Map.empty)
-    else if (offsetCommitRequest.versionId == 0) {
-      // for version 0 always store offsets to ZK
-      val responseInfo = authorizedRequestInfo.map {
-        case (topicAndPartition, metaAndError) => {
-          val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicAndPartition.topic)
-          try {
-            if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
-              (topicAndPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
-            } else if (metaAndError.metadata != null && metaAndError.metadata.length > config.offsetMetadataMaxSize) {
-              (topicAndPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
-            } else {
-              zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir + "/" +
-                topicAndPartition.partition, metaAndError.offset.toString)
-              (topicAndPartition, Errors.NONE.code)
+      if (authorizedRequestInfo.isEmpty)
+        sendResponseCallback(Map.empty)
+      else if (header.apiVersion == 0) {
+        // for version 0 always store offsets to ZK
+        val responseInfo = authorizedRequestInfo.map {
+          case (topicPartition, partitionData) =>
+            val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, topicPartition.topic)
+            try {
+              if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).size <= 0)
+                (topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+              else if (partitionData.metadata != null && partitionData.metadata.length > config.offsetMetadataMaxSize)
+                (topicPartition, Errors.OFFSET_METADATA_TOO_LARGE.code)
+              else {
+                zkUtils.updatePersistentPath(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}", partitionData.offset.toString)
+                (topicPartition, Errors.NONE.code)
+              }
+            } catch {
+              case e: Throwable => (topicPartition, Errors.forException(e).code)
             }
-          } catch {
-            case e: Throwable => (topicAndPartition, Errors.forException(e).code)
-          }
         }
-      }
-
-      sendResponseCallback(responseInfo)
-    } else {
-      // for version 1 and beyond store offsets in offset manager
-
-      // compute the retention time based on the request version:
-      // if it is v1 or not specified by user, we can use the default retention
-      val offsetRetention =
-        if (offsetCommitRequest.versionId <= 1 ||
-          offsetCommitRequest.retentionMs == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_RETENTION_TIME) {
-          coordinator.offsetConfig.offsetsRetentionMs
-        } else {
-          offsetCommitRequest.retentionMs
+        sendResponseCallback(responseInfo)
+      } else {
+        // for version 1 and beyond store offsets in offset manager
+
+        // compute the retention time based on the request version:
+        // if it is v1 or not specified by user, we can use the default retention
+        val offsetRetention =
+          if (header.apiVersion <= 1 ||
+            offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
+            coordinator.offsetConfig.offsetsRetentionMs
+          else
+            offsetCommitRequest.retentionTime
+
+        // commit timestamp is always set to now.
+        // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
+        // expire timestamp is computed differently for v1 and v2.
+        //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
+        //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
+        //   - If v2 we use the default expiration timestamp
+        val currentTimestamp = SystemTime.milliseconds
+        val defaultExpireTimestamp = offsetRetention + currentTimestamp
+        val partitionData = authorizedRequestInfo.mapValues { partitionData =>
+          new OffsetAndMetadata(
+            offsetMetadata = OffsetMetadata(partitionData.offset, partitionData.metadata),
+            commitTimestamp = currentTimestamp,
+            expireTimestamp = {
+              if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
+                defaultExpireTimestamp
+              else
+                offsetRetention + partitionData.timestamp
+            }
+          )
         }
 
-      // commit timestamp is always set to now.
-      // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
-      // expire timestamp is computed differently for v1 and v2.
-      //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
-      //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
-      //   - If v2 we use the default expiration timestamp
-      val currentTimestamp = SystemTime.milliseconds
-      val defaultExpireTimestamp = offsetRetention + currentTimestamp
-      val offsetData = authorizedRequestInfo.mapValues(offsetAndMetadata =>
-        offsetAndMetadata.copy(
-          commitTimestamp = currentTimestamp,
-          expireTimestamp = {
-            if (offsetAndMetadata.commitTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
-              defaultExpireTimestamp
-            else
-              offsetRetention + offsetAndMetadata.commitTimestamp
-          }
-        )
-      )
-
-      // call coordinator to handle commit offset
-      coordinator.handleCommitOffsets(
-        offsetCommitRequest.groupId,
-        offsetCommitRequest.memberId,
-        offsetCommitRequest.groupGenerationId,
-        offsetData,
-        sendResponseCallback)
+        // call coordinator to handle commit offset
+        coordinator.handleCommitOffsets(
+          offsetCommitRequest.groupId,
+          offsetCommitRequest.memberId,
+          offsetCommitRequest.generationId,
+          partitionData,
+          sendResponseCallback)
+      }
     }
   }
 
@@ -699,61 +701,61 @@ class KafkaApis(val requestChannel: RequestChannel,
   /*
    * Handle an offset fetch request
    */
-
   def handleOffsetFetchRequest(request: RequestChannel.Request) {
-    val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+    val header = request.header
+    val offsetFetchRequest = request.body.asInstanceOf[OffsetFetchRequest]
 
-    // reject the request immediately if not authorized to the group
+    val responseHeader = new ResponseHeader(header.correlationId)
+    val offsetFetchResponse =
+    // reject the request if not authorized to the group
     if (!authorize(request.session, Read, new Resource(Group, offsetFetchRequest.groupId))) {
-      val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.GROUP_AUTHORIZATION_FAILED.code)
-      val response = OffsetFetchResponse(offsetFetchRequest.requestInfo.map{ _ -> authorizationError}.toMap)
-      requestChannel.sendResponse(new Response(request, new RequestOrResponseSend(request.connectionId, response)))
-      return
-    }
-
-    val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.requestInfo.partition { topicAndPartition =>
-      authorize(request.session, Describe, new Resource(Topic, topicAndPartition.topic))
-    }
-
-    val authorizationError = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.TOPIC_AUTHORIZATION_FAILED.code)
-    val unauthorizedStatus = unauthorizedTopicPartitions.map(topicAndPartition => (topicAndPartition, authorizationError)).toMap
-
-    val response = if (offsetFetchRequest.versionId == 0) {
-      // version 0 reads offsets from ZK
-      val responseInfo = authorizedTopicPartitions.map( topicAndPartition => {
-        val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicAndPartition.topic)
-        try {
-          if (metadataCache.getTopicMetadata(Set(topicAndPartition.topic), request.securityProtocol).size <= 0) {
-            (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
-          } else {
-            val payloadOpt = zkUtils.readDataMaybeNull(topicDirs.consumerOffsetDir + "/" + topicAndPartition.partition)._1
-            payloadOpt match {
-              case Some(payload) => (topicAndPartition, OffsetMetadataAndError(payload.toLong))
-              case None => (topicAndPartition, OffsetMetadataAndError.UnknownTopicOrPartition)
+      val unauthorizedGroupResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.GROUP_AUTHORIZATION_FAILED.code)
+      val results = offsetFetchRequest.partitions.asScala.map { topicPartition => (topicPartition, unauthorizedGroupResponse)}.toMap
+      new OffsetFetchResponse(results.asJava)
+    } else {
+      val (authorizedTopicPartitions, unauthorizedTopicPartitions) = offsetFetchRequest.partitions.asScala.partition { topicPartition =>
+        authorize(request.session, Describe, new Resource(Topic, topicPartition.topic))
+      }
+      val unauthorizedTopicResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.TOPIC_AUTHORIZATION_FAILED.code)
+      val unauthorizedStatus = unauthorizedTopicPartitions.map(topicPartition => (topicPartition, unauthorizedTopicResponse)).toMap
+      val unknownTopicPartitionResponse = new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "", Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+
+      if (header.apiVersion == 0) {
+        // version 0 reads offsets from ZK
+        val responseInfo = authorizedTopicPartitions.map { topicPartition =>
+          val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, topicPartition.topic)
+          try {
+            if (metadataCache.getTopicMetadata(Set(topicPartition.topic), request.securityProtocol).isEmpty)
+              (topicPartition, unknownTopicPartitionResponse)
+            else {
+              val payloadOpt = zkUtils.readDataMaybeNull(s"${topicDirs.consumerOffsetDir}/${topicPartition.partition}")._1
+              payloadOpt match {
+                case Some(payload) =>
+                  (topicPartition, new OffsetFetchResponse.PartitionData(payload.toLong, "", Errors.NONE.code))
+                case None =>
+                  (topicPartition, unknownTopicPartitionResponse)
+              }
             }
+          } catch {
+            case e: Throwable =>
+              (topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET, "",
+                Errors.forException(e).code))
           }
-        } catch {
-          case e: Throwable =>
-            (topicAndPartition, OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata,
-              Errors.forException(e).code))
-        }
-      })
-
-      OffsetFetchResponse(collection.immutable.Map(responseInfo: _*) ++ unauthorizedStatus, offsetFetchRequest.correlationId)
-    } else {
-      // version 1 reads offsets from Kafka;
-      val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
+        }.toMap
+        new OffsetFetchResponse((responseInfo ++ unauthorizedStatus).asJava)
+      } else {
+        // version 1 reads offsets from Kafka;
+        val offsets = coordinator.handleFetchOffsets(offsetFetchRequest.groupId, authorizedTopicPartitions).toMap
 
-      // Note that we do not need to filter the partitions in the
-      // metadata cache as the topic partitions will be filtered
-      // in coordinator's offset manager through the offset cache
-      OffsetFetchResponse(offsets ++ unauthorizedStatus, offsetFetchRequest.correlationId)
+        // Note that we do not need to filter the partitions in the
+        // metadata cache as the topic partitions will be filtered
+        // in coordinator's offset manager through the offset cache
+        new OffsetFetchResponse((offsets ++ unauthorizedStatus).asJava)
+      }
     }
 
-    trace("Sending offset fetch response %s for correlation id %d to client %s."
-          .format(response, offsetFetchRequest.correlationId, offsetFetchRequest.clientId))
-
-    requestChannel.sendResponse(new RequestChannel.Response(request, new RequestOrResponseSend(request.connectionId, response)))
+    trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
+    requestChannel.sendResponse(new Response(request, new ResponseSend(request.connectionId, responseHeader, offsetFetchResponse)))
   }
 
   def handleGroupCoordinatorRequest(request: RequestChannel.Request) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index afba1ad..e882a30 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -65,7 +65,7 @@ object DumpLogSegments {
       CommandLineUtils.printUsageAndDie(parser, "Parse a log file and dump its contents to the console, useful for debugging a seemingly corrupt log segment.")
 
     val options = parser.parse(args : _*)
-    
+
     CommandLineUtils.checkRequiredArgs(parser, options, filesOpt)
 
     val print = if(options.has(printOpt)) true else false
@@ -75,7 +75,7 @@ object DumpLogSegments {
     val files = options.valueOf(filesOpt).split(",")
     val maxMessageSize = options.valueOf(maxMessageSizeOpt).intValue()
     val isDeepIteration = if(options.has(deepIterationOpt)) true else false
-  
+
     val messageParser = if (options.has(offsetsOpt)) {
       new OffsetsMessageParser
     } else {
@@ -114,7 +114,7 @@ object DumpLogSegments {
       }
     }
   }
-  
+
   /* print out the contents of the index */
   private def dumpIndex(file: File,
                         indexSanityOnly: Boolean,
@@ -181,10 +181,10 @@ object DumpLogSegments {
 
     private def parseOffsets(offsetKey: OffsetKey, payload: ByteBuffer) = {
       val group = offsetKey.key.group
-      val (topic, partition)  = offsetKey.key.topicPartition.asTuple
+      val topicPartition = offsetKey.key.topicPartition
       val offset = GroupMetadataManager.readOffsetMessageValue(payload)
 
-      val keyString = s"offset::${group}:${topic}:${partition}"
+      val keyString = s"offset::${group}:${topicPartition.topic}:${topicPartition.partition}"
       val valueString = if (offset.metadata.isEmpty)
         String.valueOf(offset.offset)
       else
@@ -306,5 +306,5 @@ object DumpLogSegments {
       }
     }
   }
-  
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8d0c298c/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
index 90e2b95..50fa09e 100644
--- a/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/GroupCoordinatorResponseTest.scala
@@ -48,8 +48,8 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   type SyncGroupCallback = (Array[Byte], Short) => Unit
   type HeartbeatCallbackParams = Short
   type HeartbeatCallback = Short => Unit
-  type CommitOffsetCallbackParams = Map[TopicAndPartition, Short]
-  type CommitOffsetCallback = Map[TopicAndPartition, Short] => Unit
+  type CommitOffsetCallbackParams = Map[TopicPartition, Short]
+  type CommitOffsetCallback = Map[TopicPartition, Short] => Unit
   type LeaveGroupCallbackParams = Short
   type LeaveGroupCallback = Short => Unit
 
@@ -574,7 +574,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testCommitOffsetFromUnknownGroup() {
     val generationId = 1
-    val tp = new TopicAndPartition("topic", 0)
+    val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, memberId, generationId, immutable.Map(tp -> offset))
@@ -583,7 +583,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
 
   @Test
   def testCommitOffsetWithDefaultGeneration() {
-    val tp = new TopicAndPartition("topic", 0)
+    val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
@@ -594,7 +594,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   @Test
   def testCommitOffsetInAwaitingSync() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
-    val tp = new TopicAndPartition("topic", 0)
+    val tp = new TopicPartition("topic", 0)
     val offset = OffsetAndMetadata(0)
 
     val joinGroupResult = joinGroup(groupId, memberId, DefaultSessionTimeout, protocolType, protocols)
@@ -900,7 +900,7 @@ class GroupCoordinatorResponseTest extends JUnitSuite {
   private def commitOffsets(groupId: String,
                             consumerId: String,
                             generationId: Int,
-                            offsets: immutable.Map[TopicAndPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
+                            offsets: immutable.Map[TopicPartition, OffsetAndMetadata]): CommitOffsetCallbackParams = {
     val (responseFuture, responseCallback) = setupCommitOffsetsCallback
 
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()