You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/04 23:30:59 UTC

git commit: KAFKA-657 Add APIs for the consumer to commit and fetch offsets on the broker.

Updated Branches:
  refs/heads/trunk a39c34a2d -> 222c0e46a


KAFKA-657 Add APIs for the consumer to commit and fetch offsets on the broker.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/222c0e46
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/222c0e46
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/222c0e46

Branch: refs/heads/trunk
Commit: 222c0e46ab78a12e1f58e3fb526667a4f73d344a
Parents: a39c34a
Author: David Arthur <mu...@gmail.com>
Authored: Fri Jan 4 14:30:03 2013 -0800
Committer: Jay Kreps <ja...@gmail.com>
Committed: Fri Jan 4 14:30:03 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/api/OffsetCommitRequest.scala |  100 +++++++++
 .../scala/kafka/api/OffsetCommitResponse.scala     |   93 ++++++++
 .../main/scala/kafka/api/OffsetFetchRequest.scala  |   89 ++++++++
 .../main/scala/kafka/api/OffsetFetchResponse.scala |  100 +++++++++
 core/src/main/scala/kafka/api/RequestKeys.scala    |    6 +-
 .../src/main/scala/kafka/common/ErrorMapping.scala |    4 +-
 .../kafka/common/OffsetMetadataAndError.scala      |   36 +++
 .../common/OffsetMetadataTooLargeException.scala   |   27 +++
 .../main/scala/kafka/consumer/SimpleConsumer.scala |   14 ++
 .../main/scala/kafka/javaapi/FetchRequest.scala    |    4 -
 core/src/main/scala/kafka/javaapi/Implicits.scala  |    6 +
 .../scala/kafka/javaapi/OffsetCommitRequest.scala  |   55 +++++
 .../scala/kafka/javaapi/OffsetCommitResponse.scala |   30 +++
 .../scala/kafka/javaapi/OffsetFetchRequest.scala   |   58 +++++
 .../scala/kafka/javaapi/OffsetFetchResponse.scala  |   30 +++
 .../main/scala/kafka/javaapi/OffsetRequest.scala   |    5 -
 .../kafka/javaapi/consumer/SimpleConsumer.scala    |   21 ++-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   88 +++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |    3 +
 core/src/main/scala/kafka/server/KafkaServer.scala |    2 +-
 .../api/RequestResponseSerializationTest.scala     |   65 ++++++-
 .../unit/kafka/integration/TopicMetadataTest.scala |    4 +-
 .../scala/unit/kafka/server/OffsetCommitTest.scala |  174 +++++++++++++++
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |    4 +-
 24 files changed, 998 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
new file mode 100644
index 0000000..0a34d5d
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import kafka.utils.Logging
+
+object OffsetCommitRequest extends Logging {
+  val CurrentVersion: Short = 0
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
+    // Read values from the envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+
+    // Read the OffsetRequest 
+    val consumerGroupId = readShortString(buffer)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val offset = buffer.getLong
+        val metadata = readShortString(buffer)
+        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
+      })
+    })
+    OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
+  }
+}
+
+case class OffsetCommitRequest(groupId: String,
+                               requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+                               versionId: Short = OffsetCommitRequest.CurrentVersion,
+                               correlationId: Int = 0,
+                               clientId: String = OffsetCommitRequest.DefaultClientId)
+    extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey)) {
+
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+  
+  def writeTo(buffer: ByteBuffer) {
+    // Write envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+
+    // Write OffsetCommitRequest
+    writeShortString(buffer, groupId)             // consumer group
+    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
+      writeShortString(buffer, t1._1) // topic
+      buffer.putInt(t1._2.size)       // number of partitions for this topic
+      t1._2.foreach( t2 => {
+        buffer.putInt(t2._1.partition)  // partition
+        buffer.putLong(t2._2.offset)    // offset
+        writeShortString(buffer, t2._2.metadata) // metadata
+      })
+    })
+  }
+
+  override def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    shortStringLength(clientId) +
+    shortStringLength(groupId) + 
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+      val (topic, offsets) = topicAndOffsets
+      count +
+      shortStringLength(topic) + /* topic */
+      4 + /* number of partitions */
+      offsets.foldLeft(0)((innerCount, offsetAndMetadata) => {
+        innerCount +
+        4 /* partition */ +
+        8 /* offset */ +
+        shortStringLength(offsetAndMetadata._2.metadata)
+      })
+    })
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
new file mode 100644
index 0000000..4e1313e
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+
+object OffsetCommitResponse extends Logging {
+  val CurrentVersion: Short = 0
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): OffsetCommitResponse = {
+    // Read values from the envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+
+    // Read the OffsetResponse 
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val error = buffer.getShort
+        (TopicAndPartition(topic, partitionId), error)
+      })
+    })
+    OffsetCommitResponse(Map(pairs:_*), versionId, correlationId, clientId)
+  }
+}
+
+case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
+                               versionId: Short = OffsetCommitResponse.CurrentVersion,
+                               correlationId: Int = 0,
+                               clientId: String = OffsetCommitResponse.DefaultClientId)
+    extends RequestOrResponse {
+
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+  def writeTo(buffer: ByteBuffer) {
+    // Write envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+
+    // Write OffsetCommitResponse
+    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
+      writeShortString(buffer, t1._1) // topic
+      buffer.putInt(t1._2.size)       // number of partitions for this topic
+      t1._2.foreach( t2 => {  // TopicAndPartition -> Short
+        buffer.putInt(t2._1.partition)
+        buffer.putShort(t2._2)  //error
+      })
+    })
+  }
+
+  override def sizeInBytes = 
+    2 + /* versionId */
+    4 + /* correlationId */
+    shortStringLength(clientId) +
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+      val (topic, offsets) = topicAndOffsets
+      count +
+      shortStringLength(topic) + /* topic */
+      4 + /* number of partitions */
+      offsets.size * (
+        4 + /* partition */
+        2 /* error */
+      )
+    })
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
new file mode 100644
index 0000000..63c1349
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.TopicAndPartition
+import kafka.utils.Logging
+
+object OffsetFetchRequest extends Logging {
+  val CurrentVersion: Short = 0
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): OffsetFetchRequest = {
+    // Read values from the envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+
+    // Read the OffsetFetchRequest
+    val consumerGroupId = readShortString(buffer)
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        TopicAndPartition(topic, partitionId)
+      })
+    })
+    OffsetFetchRequest(consumerGroupId, pairs, versionId, correlationId, clientId)
+  }
+}
+
+case class OffsetFetchRequest(groupId: String,
+                               requestInfo: Seq[TopicAndPartition],
+                               versionId: Short = OffsetFetchRequest.CurrentVersion,
+                               correlationId: Int = 0,
+                               clientId: String = OffsetFetchRequest.DefaultClientId)
+    extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey)) {
+
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
+  
+  def writeTo(buffer: ByteBuffer) {
+    // Write envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+
+    // Write OffsetFetchRequest
+    writeShortString(buffer, groupId)             // consumer group
+    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+    requestInfoGroupedByTopic.foreach( t1 => { // (topic, Seq[TopicAndPartition])
+      writeShortString(buffer, t1._1) // topic
+      buffer.putInt(t1._2.size)       // number of partitions for this topic
+      t1._2.foreach( t2 => {
+        buffer.putInt(t2.partition)
+      })
+    })
+  }
+
+  override def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    shortStringLength(clientId) +
+    shortStringLength(groupId) + 
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((count, t) => {
+      count + shortStringLength(t._1) + /* topic */
+      4 + /* number of partitions */
+      t._2.size * 4 /* partition */
+    })
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchResponse.scala b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
new file mode 100644
index 0000000..fb5e6cb
--- /dev/null
+++ b/core/src/main/scala/kafka/api/OffsetFetchResponse.scala
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+
+import kafka.api.ApiUtils._
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import kafka.utils.Logging
+
+object OffsetFetchResponse extends Logging {
+  val CurrentVersion: Short = 0
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer): OffsetFetchResponse = {
+    // Read values from the envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = readShortString(buffer)
+
+    // Read the OffsetResponse 
+    val topicCount = buffer.getInt
+    val pairs = (1 to topicCount).flatMap(_ => {
+      val topic = readShortString(buffer)
+      val partitionCount = buffer.getInt
+      (1 to partitionCount).map(_ => {
+        val partitionId = buffer.getInt
+        val offset = buffer.getLong
+        val metadata = readShortString(buffer)
+        val error = buffer.getShort
+        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata, error))
+      })
+    })
+    OffsetFetchResponse(Map(pairs:_*), versionId, correlationId, clientId)
+  }
+}
+
+case class OffsetFetchResponse(requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+                               versionId: Short = OffsetFetchResponse.CurrentVersion,
+                               correlationId: Int = 0,
+                               clientId: String = OffsetFetchResponse.DefaultClientId)
+    extends RequestOrResponse {
+
+  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+
+  def writeTo(buffer: ByteBuffer) {
+    // Write envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    writeShortString(buffer, clientId)
+
+    // Write OffsetFetchResponse
+    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
+    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, OffsetMetadataAndError]
+      writeShortString(buffer, t1._1) // topic
+      buffer.putInt(t1._2.size)       // number of partitions for this topic
+      t1._2.foreach( t2 => { // TopicAndPartition -> OffsetMetadataAndError 
+        buffer.putInt(t2._1.partition)
+        buffer.putLong(t2._2.offset)
+        writeShortString(buffer, t2._2.metadata)
+        buffer.putShort(t2._2.error)
+      })
+    })
+  }
+
+  override def sizeInBytes = 
+    2 + /* versionId */
+    4 + /* correlationId */
+    shortStringLength(clientId) +
+    4 + /* topic count */
+    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
+      val (topic, offsets) = topicAndOffsets
+      count +
+      shortStringLength(topic) + /* topic */
+      4 + /* number of partitions */
+      offsets.foldLeft(0)((innerCount, offsetsAndMetadata) => {
+        innerCount +
+        4 /* partition */ +
+        8 /* offset */ +
+        shortStringLength(offsetsAndMetadata._2.metadata) +
+        2 /* error */
+      })
+    })
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index b000eb7..89ce92a 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -27,6 +27,8 @@ object RequestKeys {
   val MetadataKey: Short = 3
   val LeaderAndIsrKey: Short = 4
   val StopReplicaKey: Short = 5
+  val OffsetCommitKey: Short = 6
+  val OffsetFetchKey: Short = 7
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -34,7 +36,9 @@ object RequestKeys {
         OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
-        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom))
+        StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom),
+        OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
+        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index c8769e0..153bc0b 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -41,6 +41,7 @@ object ErrorMapping {
   val ReplicaNotAvailableCode: Short = 9
   val MessageSizeTooLargeCode: Short = 10
   val StaleControllerEpochCode: Short = 11
+  val OffsetMetadataTooLargeCode: Short = 12
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -54,7 +55,8 @@ object ErrorMapping {
       classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
-      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode
+      classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
+      classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
new file mode 100644
index 0000000..59608a3
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -0,0 +1,36 @@
+package kafka.common
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Convenience case class since (topic, partition) pairs are ubiquitous.
+ */
+case class OffsetMetadataAndError(offset: Long, metadata: String = OffsetMetadataAndError.NoMetadata, error: Short = ErrorMapping.NoError) {
+
+  def this(tuple: (Long, String, Short)) = this(tuple._1, tuple._2, tuple._3)
+
+  def asTuple = (offset, metadata, error)
+
+  override def toString = "OffsetAndMetadata[%d,%s,%d]".format(offset, metadata, error)
+
+}
+
+object OffsetMetadataAndError {
+  val InvalidOffset: Long = -1L;
+  val NoMetadata: String = "";
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala b/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
new file mode 100644
index 0000000..50edb27
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetMetadataTooLargeException.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates the client has specified offset metadata that exceeds the configured
+ * maximum size in bytes
+ */
+class OffsetMetadataTooLargeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index 6b83deb..cd8ef0b 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -175,6 +175,20 @@ class SimpleConsumer(val host: String,
    */
   def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).buffer)
 
+  /**
+   * Commit offsets for a topic
+   * @param request a [[kafka.api.OffsetCommitRequest]] object.
+   * @return a [[kafka.api.OffsetCommitResponse]] object.
+   */
+  def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+
+  /**
+   * Fetch offsets for a topic
+   * @param request a [[kafka.api.OffsetFetchRequest]] object.
+   * @return a [[kafka.api.OffsetFetchResponse]] object.
+   */
+  def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).buffer)
+
   private def getOrMakeConnection() {
     if(!blockingChannel.isConnected) {
       connect()

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/FetchRequest.scala b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
index 44d148e..942427a 100644
--- a/core/src/main/scala/kafka/javaapi/FetchRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/FetchRequest.scala
@@ -42,10 +42,6 @@ class FetchRequest(correlationId: Int,
     )
   }
 
-  def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
-
-  def sizeInBytes = underlying.sizeInBytes
-
   override def toString = underlying.toString
 
   override def equals(other: Any) = canEqual(other) && {

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/Implicits.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/Implicits.scala b/core/src/main/scala/kafka/javaapi/Implicits.scala
index cf82b38..66ab821 100644
--- a/core/src/main/scala/kafka/javaapi/Implicits.scala
+++ b/core/src/main/scala/kafka/javaapi/Implicits.scala
@@ -34,6 +34,12 @@ private[javaapi] object Implicits extends Logging {
   implicit def toJavaOffsetResponse(response: kafka.api.OffsetResponse): kafka.javaapi.OffsetResponse =
     new kafka.javaapi.OffsetResponse(response)
 
+  implicit def toJavaOffsetFetchResponse(response: kafka.api.OffsetFetchResponse): kafka.javaapi.OffsetFetchResponse =
+    new kafka.javaapi.OffsetFetchResponse(response)
+
+  implicit def toJavaOffsetCommitResponse(response: kafka.api.OffsetCommitResponse): kafka.javaapi.OffsetCommitResponse =
+    new kafka.javaapi.OffsetCommitResponse(response)
+
   implicit def optionToJavaRef[T](opt: Option[T]): T = {
     opt match {
       case Some(obj) => obj

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
new file mode 100644
index 0000000..32033d6
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+class OffsetCommitRequest(groupId: String,
+                          requestInfo: java.util.Map[TopicAndPartition, OffsetMetadataAndError],
+                          versionId: Short,
+                          correlationId: Int,
+                          clientId: String) {
+  val underlying = {
+    val scalaMap = JavaConversions.asMap(requestInfo).toMap
+    kafka.api.OffsetCommitRequest(
+      groupId = groupId,
+      requestInfo = scalaMap,
+      versionId = versionId,
+      correlationId = correlationId,
+      clientId = clientId
+    )
+  }
+
+
+  override def toString = underlying.toString
+
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetCommitRequest]
+    this.underlying.equals(otherOffsetRequest.underlying)
+  }
+
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetCommitRequest]
+
+
+  override def hashCode = underlying.hashCode
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
new file mode 100644
index 0000000..d1c50c4
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import collection.JavaConversions
+
+class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitResponse) {
+
+  def errors: java.util.Map[TopicAndPartition, Short] = {
+    JavaConversions.asMap(underlying.requestInfo) 
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
new file mode 100644
index 0000000..64d134b
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchRequest.scala
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.TopicAndPartition
+import collection.JavaConversions
+import java.nio.ByteBuffer
+
+class OffsetFetchRequest(groupId: String,
+                         requestInfo: java.util.List[TopicAndPartition],
+                         versionId: Short,
+                         correlationId: Int,
+                         clientId: String) {
+
+  val underlying = {
+    val scalaSeq = JavaConversions.asBuffer(requestInfo)
+    kafka.api.OffsetFetchRequest(
+      groupId = groupId,
+      requestInfo = scalaSeq,
+      versionId = versionId,
+      correlationId = correlationId,
+      clientId = clientId
+    )
+  }
+
+
+  override def toString = underlying.toString
+
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherOffsetRequest = other.asInstanceOf[kafka.javaapi.OffsetFetchRequest]
+    this.underlying.equals(otherOffsetRequest.underlying)
+  }
+
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.OffsetFetchRequest]
+
+
+  override def hashCode = underlying.hashCode
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
new file mode 100644
index 0000000..9f83c1b
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/OffsetFetchResponse.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
+import collection.JavaConversions
+
+class OffsetFetchResponse(private val underlying: kafka.api.OffsetFetchResponse) {
+
+  def offsets: java.util.Map[TopicAndPartition, OffsetMetadataAndError] = {
+    JavaConversions.asMap(underlying.requestInfo)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
index 1c77ff8..3565a15 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetRequest.scala
@@ -38,11 +38,6 @@ class OffsetRequest(requestInfo: java.util.Map[TopicAndPartition, PartitionOffse
   }
 
 
-  def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
-
-
-  def sizeInBytes = underlying.sizeInBytes
-
 
   override def toString = underlying.toString
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
index 58c7081..0ab0195 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/SimpleConsumer.scala
@@ -21,7 +21,6 @@ import kafka.utils.threadsafe
 import kafka.javaapi.FetchResponse
 import kafka.javaapi.OffsetRequest
 
-
 /**
  * A consumer of kafka messages
  */
@@ -80,6 +79,26 @@ class SimpleConsumer(val host: String,
     underlying.getOffsetsBefore(request.underlying)
   }
 
+  /**
+   * Commit offsets for a topic
+   * @param request a [[kafka.javaapi.OffsetCommitRequest]] object.
+   * @return a [[kafka.javaapi.OffsetCommitResponse]] object.
+   */
+  def commitOffsets(request: kafka.javaapi.OffsetCommitRequest): kafka.javaapi.OffsetCommitResponse = {
+    import kafka.javaapi.Implicits._
+    underlying.commitOffsets(request.underlying)
+  }
+
+  /**
+   * Fetch offsets for a topic
+   * @param request a [[kafka.javaapi.OffsetFetchRequest]] object.
+   * @return a [[kafka.javaapi.OffsetFetchResponse]] object.
+   */
+  def fetchOffsets(request: kafka.javaapi.OffsetFetchRequest): kafka.javaapi.OffsetFetchResponse = {
+    import kafka.javaapi.Implicits._
+    underlying.fetchOffsets(request.underlying)
+  }
+
   def close() {
     underlying.close
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index ef3b66e..4283973 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -22,7 +22,7 @@ import kafka.api._
 import kafka.message._
 import kafka.network._
 import kafka.log._
-import kafka.utils.{Pool, SystemTime, Logging}
+import kafka.utils.{Pool, SystemTime, Logging, ZkUtils, ZKGroupTopicDirs}
 import org.apache.log4j.Logger
 import scala.collection._
 import kafka.network.RequestChannel.Response
@@ -39,7 +39,8 @@ import kafka.common._
 class KafkaApis(val requestChannel: RequestChannel,
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
-                brokerId: Int) extends Logging {
+                val brokerId: Int,
+                val config: KafkaConfig) extends Logging {
 
   private val producerRequestPurgatory =
     new ProducerRequestPurgatory(replicaManager.config.producerRequestPurgatoryPurgeInterval)
@@ -62,6 +63,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
         case RequestKeys.LeaderAndIsrKey => handleLeaderAndIsrRequest(request)
         case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+        case RequestKeys.OffsetCommitKey => handleOffsetCommitRequest(request)
+        case RequestKeys.OffsetFetchKey => handleOffsetFetchRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -118,6 +121,25 @@ class KafkaApis(val requestChannel: RequestChannel,
             error("error when handling request %s".format(apiRequest), e)
             val errorResponse = StopReplicaResponse(apiRequest.correlationId, responseMap)
             requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+          case RequestKeys.OffsetCommitKey =>
+            val apiRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+            val responseMap = apiRequest.requestInfo.map {
+              case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+            }.toMap
+            error("error when handling request %s".format(apiRequest), e)
+            val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+          case RequestKeys.OffsetFetchKey =>
+            val apiRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+            val responseMap = apiRequest.requestInfo.map {
+              case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
+                offset=OffsetMetadataAndError.InvalidOffset, 
+                error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+              ))
+            }.toMap
+            error("error when handling request %s".format(apiRequest), e)
+            val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=apiRequest.correlationId)
+            requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
         }
     } finally
       request.apiLocalCompleteTimeMs = SystemTime.milliseconds
@@ -525,6 +547,68 @@ class KafkaApis(val requestChannel: RequestChannel,
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
 
+  /* 
+   * Service the Offset commit API
+   */
+  def handleOffsetCommitRequest(request: RequestChannel.Request) {
+    val offsetCommitRequest = request.requestObj.asInstanceOf[OffsetCommitRequest]
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Handling offset commit request " + offsetCommitRequest.toString)
+    trace("Handling offset commit request " + offsetCommitRequest.toString)
+    val responseInfo = offsetCommitRequest.requestInfo.map( t => {
+      val topicDirs = new ZKGroupTopicDirs(offsetCommitRequest.groupId, t._1.topic)
+      try {
+        if(t._2.metadata.length > config.offsetMetadataMaxSize) {
+          (t._1, ErrorMapping.OffsetMetadataTooLargeCode)
+        } else {
+          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" +
+            t._1.partition, t._2.offset.toString)
+          (t._1, ErrorMapping.NoError)
+        }
+      } catch {
+        case e => 
+          (t._1, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
+      }
+    })
+    val response = new OffsetCommitResponse(responseInfo, 
+                                        offsetCommitRequest.versionId, 
+                                        offsetCommitRequest.correlationId,
+                                        offsetCommitRequest.clientId)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
+  /*
+   * Service the Offset fetch API
+   */
+  def handleOffsetFetchRequest(request: RequestChannel.Request) {
+    val offsetFetchRequest = request.requestObj.asInstanceOf[OffsetFetchRequest]
+    if(requestLogger.isTraceEnabled)
+      requestLogger.trace("Handling offset fetch request " + offsetFetchRequest.toString)
+    trace("Handling offset fetch request " + offsetFetchRequest.toString)
+    val responseInfo = offsetFetchRequest.requestInfo.map( t => {
+      val topicDirs = new ZKGroupTopicDirs(offsetFetchRequest.groupId, t.topic)
+      try {
+        val payloadOpt = ZkUtils.readDataMaybeNull(zkClient, topicDirs.consumerOffsetDir + "/" + t.partition)._1
+        payloadOpt match {
+          case Some(payload) => {
+            (t, OffsetMetadataAndError(offset=payload.toLong, error=ErrorMapping.NoError))
+          } 
+          case None => (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
+                          ErrorMapping.UnknownTopicOrPartitionCode))
+        }
+      } catch {
+        case e => 
+          (t, OffsetMetadataAndError(OffsetMetadataAndError.InvalidOffset, OffsetMetadataAndError.NoMetadata,
+             ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])))
+      }
+    })
+    val response = new OffsetFetchResponse(collection.immutable.Map(responseInfo: _*), 
+                                        offsetFetchRequest.versionId, 
+                                        offsetFetchRequest.correlationId,
+                                        offsetFetchRequest.clientId)
+    requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
+  }
+
   def close() {
     debug("Shutting down.")
     fetchRequestPurgatory.shutdown()

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 19c797c..264a411 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -173,4 +173,7 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the purge interval (in number of requests) of the producer request purgatory */
   val producerRequestPurgatoryPurgeInterval = props.getInt("producer.purgatory.purge.interval", 10000)
 
+  /*********** Misc configuration ***********/
+  val offsetMetadataMaxSize = props.getInt("offset.metadata.max.size", 1024)
+
  }

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 50ece6f..97416d7 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -80,7 +80,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
     replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, logManager)
 
     kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId)
+    apis = new KafkaApis(socketServer.requestChannel, replicaManager, kafkaZookeeper.getZookeeperClient, config.brokerId, config)
     requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
index 509b020..c4b2986 100644
--- a/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
+++ b/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
@@ -24,7 +24,7 @@ import java.nio.ByteBuffer
 import kafka.message.{Message, ByteBufferMessageSet}
 import kafka.cluster.Broker
 import collection.mutable._
-import kafka.common.{TopicAndPartition, ErrorMapping}
+import kafka.common.{TopicAndPartition, ErrorMapping, OffsetMetadataAndError}
 import kafka.controller.LeaderIsrAndControllerEpoch
 
 
@@ -144,6 +144,36 @@ object SerializationTestUtils{
   def createTestTopicMetadataResponse: TopicMetadataResponse = {
     new TopicMetadataResponse(Seq(topicmetaData1, topicmetaData2), 1)
   }
+
+  def createTestOffsetCommitRequest: OffsetCommitRequest = {
+    new OffsetCommitRequest("group 1", collection.immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="some metadata"),
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(offset=100L, metadata=OffsetMetadataAndError.NoMetadata)
+    ))
+  }
+
+  def createTestOffsetCommitResponse: OffsetCommitResponse = {
+    new OffsetCommitResponse(collection.immutable.Map(
+      TopicAndPartition(topic1, 0) -> ErrorMapping.NoError,
+      TopicAndPartition(topic1, 1) -> ErrorMapping.UnknownTopicOrPartitionCode
+    ))
+  }
+
+  def createTestOffsetFetchRequest: OffsetFetchRequest = {
+    new OffsetFetchRequest("group 1", Seq(
+      TopicAndPartition(topic1, 0),
+      TopicAndPartition(topic1, 1)
+    ))
+  }
+
+  def createTestOffsetFetchResponse: OffsetFetchResponse = {
+    new OffsetFetchResponse(collection.immutable.Map(
+      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(42L, "some metadata", ErrorMapping.NoError),
+      TopicAndPartition(topic1, 1) -> OffsetMetadataAndError(100L, OffsetMetadataAndError.NoMetadata,
+        ErrorMapping.UnknownTopicOrPartitionCode)
+    ))
+  }
+
 }
 
 class RequestResponseSerializationTest extends JUnitSuite {
@@ -158,6 +188,10 @@ class RequestResponseSerializationTest extends JUnitSuite {
   private val offsetResponse = SerializationTestUtils.createTestOffsetResponse
   private val topicMetadataRequest = SerializationTestUtils.createTestTopicMetadataRequest
   private val topicMetadataResponse = SerializationTestUtils.createTestTopicMetadataResponse
+  private val offsetCommitRequest = SerializationTestUtils.createTestOffsetCommitRequest
+  private val offsetCommitResponse = SerializationTestUtils.createTestOffsetCommitResponse
+  private val offsetFetchRequest = SerializationTestUtils.createTestOffsetFetchRequest
+  private val offsetFetchResponse = SerializationTestUtils.createTestOffsetFetchResponse
 
 
   @Test
@@ -238,5 +272,34 @@ class RequestResponseSerializationTest extends JUnitSuite {
     val deserializedTopicMetadataResponse = TopicMetadataResponse.readFrom(buffer)
     assertEquals("The original and deserialzed topicMetadataResponse should be the same", topicMetadataResponse,
                  deserializedTopicMetadataResponse)
+
+    buffer = ByteBuffer.allocate(offsetCommitRequest.sizeInBytes)
+    offsetCommitRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetCommitRequest = OffsetCommitRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetCommitRequest should be the same", offsetCommitRequest, 
+      deserializedOffsetCommitRequest)
+
+    buffer = ByteBuffer.allocate(offsetCommitResponse.sizeInBytes)
+    offsetCommitResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetCommitResponse = OffsetCommitResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetCommitResponse should be the same", offsetCommitResponse, 
+      deserializedOffsetCommitResponse)
+
+    buffer = ByteBuffer.allocate(offsetFetchRequest.sizeInBytes)
+    offsetFetchRequest.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetFetchRequest = OffsetFetchRequest.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetFetchRequest should be the same", offsetFetchRequest, 
+      deserializedOffsetFetchRequest)
+
+    buffer = ByteBuffer.allocate(offsetFetchResponse.sizeInBytes)
+    offsetFetchResponse.writeTo(buffer)
+    buffer.rewind()
+    val deserializedOffsetFetchResponse = OffsetFetchResponse.readFrom(buffer)
+    assertEquals("The original and deserialzed offsetFetchResponse should be the same", offsetFetchResponse, 
+      deserializedOffsetFetchResponse)
+
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
index 54a5a06..230119b 100644
--- a/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
@@ -107,7 +107,7 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1, configs.head)
 
     // call the API (to be tested) to get metadata
     apis.handleTopicMetadataRequest(new RequestChannel.Request
@@ -119,4 +119,4 @@ class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     topicMetadata
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
new file mode 100644
index 0000000..48d5647
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.io.File
+import kafka.utils._
+import junit.framework.Assert._
+import java.util.Properties
+import kafka.consumer.SimpleConsumer
+import org.junit.{After, Before, Test}
+import kafka.message.{NoCompressionCodec, ByteBufferMessageSet, Message}
+import kafka.zk.ZooKeeperTestHarness
+import org.scalatest.junit.JUnit3Suite
+import kafka.admin.CreateTopicCommand
+import kafka.api.{OffsetCommitRequest, OffsetFetchRequest}
+import kafka.utils.TestUtils._
+import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import scala.util.Random
+
+class OffsetCommitTest extends JUnit3Suite with ZooKeeperTestHarness {
+  val random: Random = new Random()
+  var logDir: File = null
+  var topicLogDir: File = null
+  var server: KafkaServer = null
+  var logSize: Int = 100
+  val brokerPort: Int = 9099
+  var simpleConsumer: SimpleConsumer = null
+  var time: Time = new MockTime()
+
+  @Before
+  override def setUp() {
+    super.setUp()
+    val config: Properties = createBrokerConfig(1, brokerPort)
+    val logDirPath = config.getProperty("log.dir")
+    logDir = new File(logDirPath)
+    time = new MockTime()
+    server = TestUtils.createServer(new KafkaConfig(config), time)
+    simpleConsumer = new SimpleConsumer("localhost", brokerPort, 1000000, 64*1024, "test-client")
+  }
+
+  @After
+  override def tearDown() {
+    simpleConsumer.close
+    server.shutdown
+    Utils.rm(logDir)
+    super.tearDown()
+  }
+
+  @Test
+  def testUpdateOffsets() {
+    val topic = "topic"
+
+    // Commit an offset
+    val topicAndPartition = TopicAndPartition(topic, 0)
+    val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(offset=42L)))
+    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+
+    // Fetch it and verify
+    val fetchRequest = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+    val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
+
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(topicAndPartition).get.error)
+    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(topicAndPartition).get.metadata)
+    assertEquals(42L, fetchResponse.requestInfo.get(topicAndPartition).get.offset)
+
+    // Commit a new offset
+    val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+      offset=100L,
+      metadata="some metadata"
+    )))
+    val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
+
+    assertEquals(ErrorMapping.NoError, commitResponse1.requestInfo.get(topicAndPartition).get)
+
+    // Fetch it and verify
+    val fetchRequest1 = OffsetFetchRequest("test-group", Seq(topicAndPartition))
+    val fetchResponse1 = simpleConsumer.fetchOffsets(fetchRequest1)
+    
+    assertEquals(ErrorMapping.NoError, fetchResponse1.requestInfo.get(topicAndPartition).get.error)
+    //assertEquals("some metadata", fetchResponse1.requestInfo.get(topicAndPartition).get.metadata)
+    assertEquals(100L, fetchResponse1.requestInfo.get(topicAndPartition).get.offset)
+
+  }
+
+  @Test
+  def testCommitAndFetchOffsets() {
+    val topic1 = "topic-1"
+    val topic2 = "topic-2"
+    val topic3 = "topic-3"
+    val topic4 = "topic-4"
+
+    val commitRequest = OffsetCommitRequest("test-group", Map(
+      TopicAndPartition(topic1, 0) -> OffsetMetadataAndError(offset=42L, metadata="metadata one"), 
+      TopicAndPartition(topic2, 0) -> OffsetMetadataAndError(offset=43L, metadata="metadata two"),
+      TopicAndPartition(topic3, 0) -> OffsetMetadataAndError(offset=44L, metadata="metadata three"),
+      TopicAndPartition(topic2, 1) -> OffsetMetadataAndError(offset=45L)
+    ))
+    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get)
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get)
+
+    val fetchRequest = OffsetFetchRequest("test-group", Seq(
+      TopicAndPartition(topic1, 0),
+      TopicAndPartition(topic2, 0),
+      TopicAndPartition(topic3, 0),
+      TopicAndPartition(topic2, 1),
+      TopicAndPartition(topic3, 1), // An unknown partition
+      TopicAndPartition(topic4, 0)  // An unknown topic
+    ))
+    val fetchResponse = simpleConsumer.fetchOffsets(fetchRequest)
+
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.error)
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.error)
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.error)
+    assertEquals(ErrorMapping.NoError, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.error)
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.error)
+    assertEquals(ErrorMapping.UnknownTopicOrPartitionCode, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.error)
+
+    //assertEquals("metadata one", fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.metadata)
+    //assertEquals("metadata two", fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.metadata)
+    //assertEquals("metadata three", fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.metadata)
+    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.metadata)
+    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.metadata)
+    //assertEquals(OffsetMetadataAndError.NoMetadata, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.metadata)
+
+    assertEquals(42L, fetchResponse.requestInfo.get(TopicAndPartition(topic1, 0)).get.offset)
+    assertEquals(43L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 0)).get.offset)
+    assertEquals(44L, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 0)).get.offset)
+    assertEquals(45L, fetchResponse.requestInfo.get(TopicAndPartition(topic2, 1)).get.offset)
+    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic3, 1)).get.offset)
+    assertEquals(OffsetMetadataAndError.InvalidOffset, fetchResponse.requestInfo.get(TopicAndPartition(topic4, 0)).get.offset)
+  }
+
+  @Test
+  def testLargeMetadataPayload() {
+    val topicAndPartition = TopicAndPartition("large-metadata", 0)
+    val commitRequest = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+      offset=42L,
+      metadata=random.nextString(server.config.offsetMetadataMaxSize)
+    )))
+    val commitResponse = simpleConsumer.commitOffsets(commitRequest)
+
+    assertEquals(ErrorMapping.NoError, commitResponse.requestInfo.get(topicAndPartition).get)
+
+    val commitRequest1 = OffsetCommitRequest("test-group", Map(topicAndPartition -> OffsetMetadataAndError(
+      offset=42L,
+      metadata=random.nextString(server.config.offsetMetadataMaxSize + 1)
+    )))
+    val commitResponse1 = simpleConsumer.commitOffsets(commitRequest1)
+
+    assertEquals(ErrorMapping.OffsetMetadataTooLargeCode, commitResponse1.requestInfo.get(topicAndPartition).get)
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/222c0e46/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 624d852..f3454f6 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -89,7 +89,7 @@ class SimpleFetchTest extends JUnit3Suite {
     // start a request channel with 2 processors and a queue size of 5 (this is more or less arbitrary)
     // don't provide replica or leader callbacks since they will not be tested here
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head)
 
     // This request (from a follower) wants to read up to 2*HW but should only get back up to HW bytes into the log
     val goodFetch = new FetchRequestBuilder()
@@ -156,7 +156,7 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.replay(replicaManager)
 
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId, configs.head)
 
     /**
      * This fetch, coming from a replica, requests all data at offset "15".  Because the request is coming