You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/03/14 23:14:51 UTC

[5/5] git commit: KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini

KAFKA-1012; Consumer offset management in Kafka; patched by Tejas Patil and Joel Koshy; feedback and reviews from Neha Narkhede, Jun Rao, Guozhang Wang, Sriram Subramanian, Joe Stein, Chris Riccomini


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a670537a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a670537a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a670537a

Branch: refs/heads/trunk
Commit: a670537aa33732b15b56644d8ccc1681e16395f5
Parents: 84a3a9a
Author: Joel Koshy <jj...@gmail.com>
Authored: Fri Mar 14 15:14:33 2014 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri Mar 14 15:14:33 2014 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/admin/TopicCommand.scala   |   7 +-
 .../kafka/api/ConsumerMetadataRequest.scala     |  79 +++
 .../kafka/api/ConsumerMetadataResponse.scala    |  57 +++
 .../scala/kafka/api/OffsetCommitRequest.scala   |  57 ++-
 .../scala/kafka/api/OffsetCommitResponse.scala  |  40 +-
 .../scala/kafka/api/OffsetFetchRequest.scala    |  20 +-
 core/src/main/scala/kafka/api/RequestKeys.scala |   4 +-
 .../main/scala/kafka/client/ClientUtils.scala   | 100 +++-
 .../main/scala/kafka/cluster/Partition.scala    |  28 +-
 ...nsumerCoordinatorNotAvailableException.scala |  22 +
 .../main/scala/kafka/common/ErrorMapping.scala  |   8 +-
 .../NotCoordinatorForConsumerException.scala    |  22 +
 .../kafka/common/OffsetMetadataAndError.scala   |  41 +-
 .../common/OffsetsLoadInProgressException.scala |  26 +
 core/src/main/scala/kafka/common/Topic.scala    |   4 +
 .../scala/kafka/consumer/ConsoleConsumer.scala  |  16 +-
 .../scala/kafka/consumer/ConsumerConfig.scala   |  39 ++
 .../kafka/consumer/ConsumerConnector.scala      |   2 +-
 .../scala/kafka/consumer/SimpleConsumer.scala   |  11 +-
 .../main/scala/kafka/consumer/TopicCount.scala  |  14 +-
 .../main/scala/kafka/consumer/TopicFilter.scala |  11 +-
 .../consumer/ZookeeperConsumerConnector.scala   | 300 +++++++++---
 .../kafka/controller/KafkaController.scala      |   4 +-
 .../javaapi/ConsumerMetadataResponse.scala      |  42 ++
 .../kafka/javaapi/OffsetCommitRequest.scala     |  13 +-
 .../kafka/javaapi/OffsetCommitResponse.scala    |   2 +-
 .../javaapi/consumer/ConsumerConnector.java     |   6 +-
 .../consumer/ZookeeperConsumerConnector.scala   |   8 +-
 .../main/scala/kafka/log/FileMessageSet.scala   |   4 +-
 .../scala/kafka/producer/ProducerConfig.scala   |  11 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 324 +++++++------
 .../main/scala/kafka/server/KafkaConfig.scala   |  44 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  30 +-
 .../main/scala/kafka/server/OffsetManager.scala | 480 +++++++++++++++++++
 .../scala/kafka/server/ReplicaManager.scala     |  19 +-
 .../kafka/tools/ConsumerOffsetChecker.scala     |  93 +++-
 .../scala/kafka/tools/DumpLogSegments.scala     |   8 +-
 .../main/scala/kafka/tools/MirrorMaker.scala    |   1 -
 .../kafka/tools/ReplicaVerificationTool.scala   |   5 +-
 .../kafka/tools/VerifyConsumerRebalance.scala   |   4 +-
 .../kafka/utils/VerifiableProperties.scala      |  20 +
 core/src/main/scala/kafka/utils/ZkUtils.scala   |   4 +-
 .../scala/other/kafka/TestOffsetManager.scala   | 291 +++++++++++
 .../other/kafka/TestZKConsumerOffsets.scala     |  73 ---
 .../unit/kafka/admin/DeleteTopicTest.scala      |   7 -
 .../api/RequestResponseSerializationTest.scala  | 155 ++----
 .../unit/kafka/consumer/TopicFilterTest.scala   |  24 +-
 .../ZookeeperConsumerConnectorTest.scala        |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    | 111 ++---
 .../unit/kafka/server/SimpleFetchTest.scala     |   9 +-
 .../migration_tool_test.py                      |   2 +-
 system_test/mirror_maker/README                 |  22 -
 system_test/mirror_maker/bin/expected.out       |  18 -
 system_test/mirror_maker/bin/run-test.sh        | 357 --------------
 .../config/blacklisttest.consumer.properties    |  28 --
 .../config/mirror_producer.properties           |  30 --
 .../config/server_source_1_1.properties         |  76 ---
 .../config/server_source_1_2.properties         |  76 ---
 .../config/server_source_2_1.properties         |  76 ---
 .../config/server_source_2_2.properties         |  76 ---
 .../config/server_target_1_1.properties         |  78 ---
 .../config/server_target_1_2.properties         |  78 ---
 .../config/whitelisttest_1.consumer.properties  |  28 --
 .../config/whitelisttest_2.consumer.properties  |  28 --
 .../config/zookeeper_source_1.properties        |  18 -
 .../config/zookeeper_source_2.properties        |  18 -
 .../config/zookeeper_target.properties          |  18 -
 .../mirror_maker_testsuite/mirror_maker_test.py |   2 +-
 .../cluster_config.json                         | 103 ++++
 .../config/console_consumer.properties          |   2 +
 .../config/producer_performance.properties      |   0
 .../config/server.properties                    | 144 ++++++
 .../config/zookeeper.properties                 |  23 +
 .../offset_management_test.py                   | 298 ++++++++++++
 .../testcase_7001/testcase_7001_properties.json |  95 ++++
 .../config/kafka_server_1.properties            | 148 ++++++
 .../config/kafka_server_2.properties            | 148 ++++++
 .../config/kafka_server_3.properties            | 148 ++++++
 .../config/kafka_server_4.properties            | 148 ++++++
 .../testcase_7002/config/zookeeper_0.properties |  24 +
 .../testcase_7002/testcase_7002_properties.json | 127 +++++
 .../replication_testsuite/replica_basic_test.py |   2 +-
 system_test/utils/kafka_system_test_utils.py    | 170 ++++++-
 system_test/utils/testcase_env.py               |   6 +
 84 files changed, 3642 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/admin/TopicCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/TopicCommand.scala b/core/src/main/scala/kafka/admin/TopicCommand.scala
index dc9b092..6fef9df 100644
--- a/core/src/main/scala/kafka/admin/TopicCommand.scala
+++ b/core/src/main/scala/kafka/admin/TopicCommand.scala
@@ -26,6 +26,8 @@ import scala.collection.JavaConversions._
 import kafka.cluster.Broker
 import kafka.log.LogConfig
 import kafka.consumer.Whitelist
+import kafka.server.OffsetManager
+
 
 object TopicCommand {
 
@@ -70,7 +72,7 @@ object TopicCommand {
     if (opts.options.has(opts.topicOpt)) {
       val topicsSpec = opts.options.valueOf(opts.topicOpt)
       val topicsFilter = new Whitelist(topicsSpec)
-      allTopics.filter(topicsFilter.isTopicAllowed)
+      allTopics.filter(topicsFilter.isTopicAllowed(_, excludeInternalTopics = false))
     } else
       allTopics
   }
@@ -104,6 +106,9 @@ object TopicCommand {
         println("Updated config for topic \"%s\".".format(topic))
       }
       if(opts.options.has(opts.partitionsOpt)) {
+        if (topic == OffsetManager.OffsetsTopicName) {
+          throw new IllegalArgumentException("The number of partitions for the offsets topic cannot be changed.")
+        }
         println("WARNING: If partitions are increased for a topic that has a key, the partition " +
           "logic or ordering of the messages will be affected")
         val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
new file mode 100644
index 0000000..dfad6e6
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+import kafka.network.RequestChannel.Response
+import kafka.common.ErrorMapping
+
+object ConsumerMetadataRequest {
+  val CurrentVersion = 0.shortValue
+  val DefaultClientId = ""
+
+  def readFrom(buffer: ByteBuffer) = {
+    // envelope
+    val versionId = buffer.getShort
+    val correlationId = buffer.getInt
+    val clientId = ApiUtils.readShortString(buffer)
+
+    // request
+    val group = ApiUtils.readShortString(buffer)
+    ConsumerMetadataRequest(group, versionId, correlationId, clientId)
+  }
+
+}
+
+case class ConsumerMetadataRequest(group: String,
+                                   versionId: Short = ConsumerMetadataRequest.CurrentVersion,
+                                   override val correlationId: Int = 0,
+                                   clientId: String = ConsumerMetadataRequest.DefaultClientId)
+  extends RequestOrResponse(Some(RequestKeys.ConsumerMetadataKey), correlationId) {
+
+  def sizeInBytes =
+    2 + /* versionId */
+    4 + /* correlationId */
+    ApiUtils.shortStringLength(clientId) +
+    ApiUtils.shortStringLength(group)
+
+  def writeTo(buffer: ByteBuffer) {
+    // envelope
+    buffer.putShort(versionId)
+    buffer.putInt(correlationId)
+    ApiUtils.writeShortString(buffer, clientId)
+
+    // consumer metadata request
+    ApiUtils.writeShortString(buffer, group)
+  }
+
+  override def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
+    // return ConsumerCoordinatorNotAvailable for all uncaught errors
+    val errorResponse = ConsumerMetadataResponse(None, ErrorMapping.ConsumerCoordinatorNotAvailableCode)
+    requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
+  }
+
+  def describe(details: Boolean) = {
+    val consumerMetadataRequest = new StringBuilder
+    consumerMetadataRequest.append("Name: " + this.getClass.getSimpleName)
+    consumerMetadataRequest.append("; Version: " + versionId)
+    consumerMetadataRequest.append("; CorrelationId: " + correlationId)
+    consumerMetadataRequest.append("; ClientId: " + clientId)
+    consumerMetadataRequest.append("; Group: " + group)
+    consumerMetadataRequest.toString()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
new file mode 100644
index 0000000..6807f98
--- /dev/null
+++ b/core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.api
+
+import java.nio.ByteBuffer
+import kafka.cluster.Broker
+import kafka.common.ErrorMapping
+
+object ConsumerMetadataResponse {
+  val CurrentVersion = 0
+  
+  def readFrom(buffer: ByteBuffer) = {
+    val correlationId = buffer.getInt
+    val errorCode = buffer.getShort
+    val coordinatorOpt = if (errorCode == ErrorMapping.NoError)
+      Some(Broker.readFrom(buffer))
+    else
+      None
+
+    ConsumerMetadataResponse(coordinatorOpt, errorCode, correlationId)
+  }
+  
+}
+
+case class ConsumerMetadataResponse (coordinator: Option[Broker], errorCode: Short, override val correlationId: Int = 0)
+  extends RequestOrResponse(correlationId = correlationId) {
+
+  def sizeInBytes =
+    4 + /* correlationId */
+    2 + /* error code */
+    coordinator.map(_.sizeInBytes).getOrElse(0)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(correlationId)
+    buffer.putShort(errorCode)
+    if (errorCode == ErrorMapping.NoError) {
+      coordinator.get.writeTo(buffer)
+    }
+  }
+
+  def describe(details: Boolean) = toString
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
index 4d1fa5c..9f6956e 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitRequest.scala
@@ -18,17 +18,20 @@
 package kafka.api
 
 import java.nio.ByteBuffer
-
 import kafka.api.ApiUtils._
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Logging}
 import kafka.network.{RequestChannel, BoundedByteBufferSend}
-import kafka.common.{ErrorMapping, TopicAndPartition, OffsetMetadataAndError}
+import kafka.common.{OffsetAndMetadata, ErrorMapping, TopicAndPartition}
 import kafka.network.RequestChannel.Response
+import scala.collection._
+
 object OffsetCommitRequest extends Logging {
   val CurrentVersion: Short = 0
   val DefaultClientId = ""
 
   def readFrom(buffer: ByteBuffer): OffsetCommitRequest = {
+    val now = SystemTime.milliseconds
+
     // Read values from the envelope
     val versionId = buffer.getShort
     val correlationId = buffer.getInt
@@ -43,23 +46,45 @@ object OffsetCommitRequest extends Logging {
       (1 to partitionCount).map(_ => {
         val partitionId = buffer.getInt
         val offset = buffer.getLong
+        val timestamp = {
+          val given = buffer.getLong
+          if (given == -1L) now else given
+        }
         val metadata = readShortString(buffer)
-        (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset, metadata))
+        (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset, metadata, timestamp))
       })
     })
-    OffsetCommitRequest(consumerGroupId, Map(pairs:_*), versionId, correlationId, clientId)
+    OffsetCommitRequest(consumerGroupId, mutable.Map(pairs:_*), versionId, correlationId, clientId)
   }
 }
 
 case class OffsetCommitRequest(groupId: String,
-                               requestInfo: Map[TopicAndPartition, OffsetMetadataAndError],
+                               requestInfo: mutable.Map[TopicAndPartition, OffsetAndMetadata],
                                versionId: Short = OffsetCommitRequest.CurrentVersion,
                                override val correlationId: Int = 0,
                                clientId: String = OffsetCommitRequest.DefaultClientId)
     extends RequestOrResponse(Some(RequestKeys.OffsetCommitKey), correlationId) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
-  
+
+  def filterLargeMetadata(maxMetadataSize: Int) =
+    requestInfo.filter(info => info._2.metadata == null || info._2.metadata.length <= maxMetadataSize)
+
+  def responseFor(errorCode: Short, offsetMetadataMaxSize: Int) = {
+    val commitStatus = requestInfo.map {info =>
+      (info._1, if (info._2.metadata != null && info._2.metadata.length > offsetMetadataMaxSize)
+                  ErrorMapping.OffsetMetadataTooLargeCode
+                else if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode)
+                  ErrorMapping.ConsumerCoordinatorNotAvailableCode
+                else if (errorCode == ErrorMapping.NotLeaderForPartitionCode)
+                  ErrorMapping.NotCoordinatorForConsumerCode
+                else
+                  errorCode)
+    }.toMap
+    OffsetCommitResponse(commitStatus, correlationId)
+  }
+
+
   def writeTo(buffer: ByteBuffer) {
     // Write envelope
     buffer.putShort(versionId)
@@ -73,9 +98,10 @@ case class OffsetCommitRequest(groupId: String,
       writeShortString(buffer, t1._1) // topic
       buffer.putInt(t1._2.size)       // number of partitions for this topic
       t1._2.foreach( t2 => {
-        buffer.putInt(t2._1.partition)  // partition
-        buffer.putLong(t2._2.offset)    // offset
-        writeShortString(buffer, t2._2.metadata) // metadata
+        buffer.putInt(t2._1.partition)
+        buffer.putLong(t2._2.offset)
+        buffer.putLong(t2._2.timestamp)
+        writeShortString(buffer, t2._2.metadata)
       })
     })
   }
@@ -95,15 +121,14 @@ case class OffsetCommitRequest(groupId: String,
         innerCount +
         4 /* partition */ +
         8 /* offset */ +
+        8 /* timestamp */ +
         shortStringLength(offsetAndMetadata._2.metadata)
       })
     })
 
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
-    val responseMap = requestInfo.map {
-      case (topicAndPartition, offset) => (topicAndPartition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]))
-    }.toMap
-    val errorResponse = OffsetCommitResponse(requestInfo=responseMap, correlationId=correlationId)
+    val errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+    val errorResponse = responseFor(errorCode, Int.MaxValue)
     requestChannel.sendResponse(new Response(request, new BoundedByteBufferSend(errorResponse)))
   }
 
@@ -119,7 +144,7 @@ case class OffsetCommitRequest(groupId: String,
     offsetCommitRequest.toString()
   }
 
-  override def toString(): String = {
-    describe(true)
+  override def toString = {
+    describe(details = true)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
index 9e1795f..4946e97 100644
--- a/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/api/OffsetCommitResponse.scala
@@ -19,9 +19,8 @@ package kafka.api
 
 import java.nio.ByteBuffer
 
-import kafka.api.ApiUtils._
-import kafka.common.TopicAndPartition
 import kafka.utils.Logging
+import kafka.common.TopicAndPartition
 
 object OffsetCommitResponse extends Logging {
   val CurrentVersion: Short = 0
@@ -30,7 +29,7 @@ object OffsetCommitResponse extends Logging {
     val correlationId = buffer.getInt
     val topicCount = buffer.getInt
     val pairs = (1 to topicCount).flatMap(_ => {
-      val topic = readShortString(buffer)
+      val topic = ApiUtils.readShortString(buffer)
       val partitionCount = buffer.getInt
       (1 to partitionCount).map(_ => {
         val partitionId = buffer.getInt
@@ -42,37 +41,34 @@ object OffsetCommitResponse extends Logging {
   }
 }
 
-case class OffsetCommitResponse(requestInfo: Map[TopicAndPartition, Short],
+case class OffsetCommitResponse(commitStatus: Map[TopicAndPartition, Short],
                                override val correlationId: Int = 0)
     extends RequestOrResponse(correlationId=correlationId) {
 
-  lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_._1.topic)
+  lazy val commitStatusGroupedByTopic = commitStatus.groupBy(_._1.topic)
 
   def writeTo(buffer: ByteBuffer) {
     buffer.putInt(correlationId)
-    buffer.putInt(requestInfoGroupedByTopic.size) // number of topics
-    requestInfoGroupedByTopic.foreach( t1 => { // topic -> Map[TopicAndPartition, Short]
-      writeShortString(buffer, t1._1) // topic
-      buffer.putInt(t1._2.size)       // number of partitions for this topic
-      t1._2.foreach( t2 => {  // TopicAndPartition -> Short
-        buffer.putInt(t2._1.partition)
-        buffer.putShort(t2._2)  //error
-      })
-    })
+    buffer.putInt(commitStatusGroupedByTopic.size)
+    commitStatusGroupedByTopic.foreach { case(topic, statusMap) =>
+      ApiUtils.writeShortString(buffer, topic)
+      buffer.putInt(statusMap.size) // partition count
+      statusMap.foreach { case(topicAndPartition, errorCode) =>
+        buffer.putInt(topicAndPartition.partition)
+        buffer.putShort(errorCode)
+      }
+    }
   }
 
   override def sizeInBytes = 
     4 + /* correlationId */
     4 + /* topic count */
-    requestInfoGroupedByTopic.foldLeft(0)((count, topicAndOffsets) => {
-      val (topic, offsets) = topicAndOffsets
+    commitStatusGroupedByTopic.foldLeft(0)((count, partitionStatusMap) => {
+      val (topic, partitionStatus) = partitionStatusMap
       count +
-      shortStringLength(topic) + /* topic */
-      4 + /* number of partitions */
-      offsets.size * (
-        4 + /* partition */
-        2 /* error */
-      )
+      ApiUtils.shortStringLength(topic) +
+      4 + /* partition count */
+      partitionStatus.size * ( 4 /* partition */  + 2 /* error code */)
     })
 
   override def describe(details: Boolean):String = { toString }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
index 7036532..a32f858 100644
--- a/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
+++ b/core/src/main/scala/kafka/api/OffsetFetchRequest.scala
@@ -23,7 +23,7 @@ import kafka.api.ApiUtils._
 import kafka.utils.Logging
 import kafka.network.{BoundedByteBufferSend, RequestChannel}
 import kafka.network.RequestChannel.Response
-import kafka.common.{ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
+import kafka.common.{OffsetAndMetadata, ErrorMapping, OffsetMetadataAndError, TopicAndPartition}
 object OffsetFetchRequest extends Logging {
   val CurrentVersion: Short = 0
   val DefaultClientId = ""
@@ -50,10 +50,10 @@ object OffsetFetchRequest extends Logging {
 }
 
 case class OffsetFetchRequest(groupId: String,
-                               requestInfo: Seq[TopicAndPartition],
-                               versionId: Short = OffsetFetchRequest.CurrentVersion,
-                               override val correlationId: Int = 0,
-                               clientId: String = OffsetFetchRequest.DefaultClientId)
+                              requestInfo: Seq[TopicAndPartition],
+                              versionId: Short = OffsetFetchRequest.CurrentVersion,
+                              override val correlationId: Int = 0,
+                              clientId: String = OffsetFetchRequest.DefaultClientId)
     extends RequestOrResponse(Some(RequestKeys.OffsetFetchKey), correlationId) {
 
   lazy val requestInfoGroupedByTopic = requestInfo.groupBy(_.topic)
@@ -91,8 +91,8 @@ case class OffsetFetchRequest(groupId: String,
   override  def handleError(e: Throwable, requestChannel: RequestChannel, request: RequestChannel.Request): Unit = {
     val responseMap = requestInfo.map {
       case (topicAndPartition) => (topicAndPartition, OffsetMetadataAndError(
-        offset=OffsetMetadataAndError.InvalidOffset,
-        error=ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
+        offset = OffsetAndMetadata.InvalidOffset,
+        error = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])
       ))
     }.toMap
     val errorResponse = OffsetFetchResponse(requestInfo=responseMap, correlationId=correlationId)
@@ -111,7 +111,7 @@ case class OffsetFetchRequest(groupId: String,
     offsetFetchRequest.toString()
   }
 
-  override def toString(): String = {
-    describe(true)
+  override def toString: String = {
+    describe(details = true)
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/api/RequestKeys.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/RequestKeys.scala b/core/src/main/scala/kafka/api/RequestKeys.scala
index c81214f..fbfc9d3 100644
--- a/core/src/main/scala/kafka/api/RequestKeys.scala
+++ b/core/src/main/scala/kafka/api/RequestKeys.scala
@@ -31,6 +31,7 @@ object RequestKeys {
   val ControlledShutdownKey: Short = 7
   val OffsetCommitKey: Short = 8
   val OffsetFetchKey: Short = 9
+  val ConsumerMetadataKey: Short = 10
 
   val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
     Map(ProduceKey -> ("Produce", ProducerRequest.readFrom),
@@ -42,7 +43,8 @@ object RequestKeys {
         UpdateMetadataKey -> ("UpdateMetadata", UpdateMetadataRequest.readFrom),
         ControlledShutdownKey -> ("ControlledShutdown", ControlledShutdownRequest.readFrom),
         OffsetCommitKey -> ("OffsetCommit", OffsetCommitRequest.readFrom),
-        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom))
+        OffsetFetchKey -> ("OffsetFetch", OffsetFetchRequest.readFrom),
+        ConsumerMetadataKey -> ("ConsumerMetadata", ConsumerMetadataRequest.readFrom))
 
   def nameForKey(key: Short): String = {
     keyToNameAndDeserializerMap.get(key) match {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/client/ClientUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/client/ClientUtils.scala b/core/src/main/scala/kafka/client/ClientUtils.scala
index 1d2f81b..fc9e084 100644
--- a/core/src/main/scala/kafka/client/ClientUtils.scala
+++ b/core/src/main/scala/kafka/client/ClientUtils.scala
@@ -20,12 +20,16 @@ import scala.collection._
 import kafka.cluster._
 import kafka.api._
 import kafka.producer._
-import kafka.common.KafkaException
+import kafka.common.{ErrorMapping, KafkaException}
 import kafka.utils.{Utils, Logging}
 import java.util.Properties
 import util.Random
+ import kafka.network.BlockingChannel
+ import kafka.utils.ZkUtils._
+ import org.I0Itec.zkclient.ZkClient
+ import java.io.IOException
 
-/**
+ /**
  * Helper functions common to clients (producer, consumer, or admin)
  */
 object ClientUtils extends Logging{
@@ -103,5 +107,93 @@ object ClientUtils extends Logging{
       new Broker(brokerId, hostName, port)
     })
   }
-  
-}
\ No newline at end of file
+
+   /**
+    * Creates a blocking channel to a random broker
+    */
+   def channelToAnyBroker(zkClient: ZkClient, socketTimeoutMs: Int = 3000) : BlockingChannel = {
+     var channel: BlockingChannel = null
+     var connected = false
+     while (!connected) {
+       val allBrokers = getAllBrokersInCluster(zkClient)
+       Random.shuffle(allBrokers).find { broker =>
+         trace("Connecting to broker %s:%d.".format(broker.host, broker.port))
+         try {
+           channel = new BlockingChannel(broker.host, broker.port, BlockingChannel.UseDefaultBufferSize, BlockingChannel.UseDefaultBufferSize, socketTimeoutMs)
+           channel.connect()
+           debug("Created channel to broker %s:%d.".format(channel.host, channel.port))
+           true
+         } catch {
+           case e: Exception =>
+             if (channel != null) channel.disconnect()
+             channel = null
+             info("Error while creating channel to %s:%d.".format(broker.host, broker.port))
+             false
+         }
+       }
+       connected = if (channel == null) false else true
+     }
+
+     channel
+   }
+
+   /**
+    * Creates a blocking channel to the offset manager of the given group
+    */
+   def channelToOffsetManager(group: String, zkClient: ZkClient, socketTimeoutMs: Int = 3000, retryBackOffMs: Int = 1000) = {
+     var queryChannel = channelToAnyBroker(zkClient)
+
+     var offsetManagerChannelOpt: Option[BlockingChannel] = None
+
+     while (!offsetManagerChannelOpt.isDefined) {
+
+       var coordinatorOpt: Option[Broker] = None
+
+       while (!coordinatorOpt.isDefined) {
+         try {
+           if (!queryChannel.isConnected)
+             queryChannel = channelToAnyBroker(zkClient)
+           debug("Querying %s:%d to locate offset manager for %s.".format(queryChannel.host, queryChannel.port, group))
+           queryChannel.send(ConsumerMetadataRequest(group))
+           val response = queryChannel.receive()
+           val consumerMetadataResponse =  ConsumerMetadataResponse.readFrom(response.buffer)
+           debug("Consumer metadata response: " + consumerMetadataResponse.toString)
+           if (consumerMetadataResponse.errorCode == ErrorMapping.NoError)
+             coordinatorOpt = consumerMetadataResponse.coordinator
+         }
+         catch {
+           case ioe: IOException =>
+             info("Failed to fetch consumer metadata from %s:%d.".format(queryChannel.host, queryChannel.port))
+             queryChannel.disconnect()
+         }
+       }
+
+       val coordinator = coordinatorOpt.get
+       if (coordinator.host == queryChannel.host && coordinator.port == queryChannel.port) {
+         offsetManagerChannelOpt = Some(queryChannel)
+       } else {
+         val connectString = "%s:%d".format(coordinator.host, coordinator.port)
+         var offsetManagerChannel: BlockingChannel = null
+         try {
+           debug("Connecting to offset manager %s.".format(connectString))
+           offsetManagerChannel = new BlockingChannel(coordinator.host, coordinator.port,
+                                                      BlockingChannel.UseDefaultBufferSize,
+                                                      BlockingChannel.UseDefaultBufferSize,
+                                                      socketTimeoutMs)
+           offsetManagerChannel.connect()
+           offsetManagerChannelOpt = Some(offsetManagerChannel)
+           queryChannel.disconnect()
+         }
+         catch {
+           case ioe: IOException => // offsets manager may have moved
+             info("Error while connecting to %s.".format(connectString))
+             if (offsetManagerChannel != null) offsetManagerChannel.disconnect()
+             Thread.sleep(retryBackOffMs)
+             offsetManagerChannelOpt = None // just in case someone decides to change shutdownChannel to not swallow exceptions
+         }
+       }
+     }
+
+     offsetManagerChannelOpt.get
+   }
+ }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 882b6da..0b88f14 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -22,7 +22,7 @@ import kafka.utils._
 import java.lang.Object
 import kafka.api.{PartitionStateInfo, LeaderAndIsr}
 import kafka.log.LogConfig
-import kafka.server.ReplicaManager
+import kafka.server.{OffsetManager, ReplicaManager}
 import com.yammer.metrics.core.Gauge
 import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
@@ -165,7 +165,8 @@ class Partition(val topic: String,
    *  and setting the new leader and ISR
    */
   def makeLeader(controllerId: Int,
-                 partitionStateInfo: PartitionStateInfo, correlationId: Int): Boolean = {
+                 partitionStateInfo: PartitionStateInfo, correlationId: Int,
+                 offsetManager: OffsetManager): Boolean = {
     leaderIsrUpdateLock synchronized {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -186,6 +187,8 @@ class Partition(val topic: String,
       leaderReplicaIdOpt = Some(localBrokerId)
       // we may need to increment high watermark since ISR could be down to 1
       maybeIncrementLeaderHW(getReplica().get)
+      if (topic == OffsetManager.OffsetsTopicName)
+        offsetManager.loadOffsetsFromLog(partitionId)
       true
     }
   }
@@ -196,7 +199,7 @@ class Partition(val topic: String,
    */
   def makeFollower(controllerId: Int,
                    partitionStateInfo: PartitionStateInfo,
-                   correlationId: Int): Boolean = {
+                   correlationId: Int, offsetManager: OffsetManager): Boolean = {
     leaderIsrUpdateLock synchronized {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -212,12 +215,21 @@ class Partition(val topic: String,
       inSyncReplicas = Set.empty[Replica]
       leaderEpoch = leaderAndIsr.leaderEpoch
       zkVersion = leaderAndIsr.zkVersion
+      
+      leaderReplicaIdOpt.foreach { leaderReplica =>
+        if (topic == OffsetManager.OffsetsTopicName &&
+           /* if we are making a leader->follower transition */
+           leaderReplica == localBrokerId)
+          offsetManager.clearOffsetsInPartition(partitionId)
+      }
 
-      if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
-        return false;
-
-      leaderReplicaIdOpt = Some(newLeaderBrokerId)
-      true
+      if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
+        false
+      }
+      else {
+        leaderReplicaIdOpt = Some(newLeaderBrokerId)
+        true
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala b/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
new file mode 100644
index 0000000..8e02d26
--- /dev/null
+++ b/core/src/main/scala/kafka/common/ConsumerCoordinatorNotAvailableException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class ConsumerCoordinatorNotAvailableException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/ErrorMapping.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/ErrorMapping.scala b/core/src/main/scala/kafka/common/ErrorMapping.scala
index b0b5dce..5559d26 100644
--- a/core/src/main/scala/kafka/common/ErrorMapping.scala
+++ b/core/src/main/scala/kafka/common/ErrorMapping.scala
@@ -43,6 +43,9 @@ object ErrorMapping {
   val StaleControllerEpochCode: Short = 11
   val OffsetMetadataTooLargeCode: Short = 12
   val StaleLeaderEpochCode: Short = 13
+  val OffsetsLoadInProgressCode: Short = 14
+  val ConsumerCoordinatorNotAvailableCode: Short = 15
+  val NotCoordinatorForConsumerCode: Short = 16
 
   private val exceptionToCode = 
     Map[Class[Throwable], Short](
@@ -57,7 +60,10 @@ object ErrorMapping {
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode,
       classOf[MessageSizeTooLargeException].asInstanceOf[Class[Throwable]] -> MessageSizeTooLargeCode,
       classOf[ControllerMovedException].asInstanceOf[Class[Throwable]] -> StaleControllerEpochCode,
-      classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode
+      classOf[OffsetMetadataTooLargeException].asInstanceOf[Class[Throwable]] -> OffsetMetadataTooLargeCode,
+      classOf[OffsetsLoadInProgressException].asInstanceOf[Class[Throwable]] -> OffsetsLoadInProgressCode,
+      classOf[ConsumerCoordinatorNotAvailableException].asInstanceOf[Class[Throwable]] -> ConsumerCoordinatorNotAvailableCode,
+      classOf[NotCoordinatorForConsumerException].asInstanceOf[Class[Throwable]] -> NotCoordinatorForConsumerCode
     ).withDefaultValue(UnknownCode)
   
   /* invert the mapping */

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala b/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
new file mode 100644
index 0000000..1eb74be
--- /dev/null
+++ b/core/src/main/scala/kafka/common/NotCoordinatorForConsumerException.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class NotCoordinatorForConsumerException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 59608a3..1586243 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -1,5 +1,3 @@
-package kafka.common
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -17,20 +15,41 @@ package kafka.common
  * limitations under the License.
  */
 
-/**
- * Convenience case class since (topic, partition) pairs are ubiquitous.
- */
-case class OffsetMetadataAndError(offset: Long, metadata: String = OffsetMetadataAndError.NoMetadata, error: Short = ErrorMapping.NoError) {
+package kafka.common
 
-  def this(tuple: (Long, String, Short)) = this(tuple._1, tuple._2, tuple._3)
+case class OffsetAndMetadata(offset: Long,
+                             metadata: String = OffsetAndMetadata.NoMetadata,
+                             timestamp: Long = -1L) {
+  override def toString = "OffsetAndMetadata[%d,%s%s]"
+                          .format(offset,
+                                  if (metadata != null && metadata.length > 0) metadata else "NO_METADATA",
+                                  if (timestamp == -1) "" else "," + timestamp.toString)
+}
 
-  def asTuple = (offset, metadata, error)
+object OffsetAndMetadata {
+  val InvalidOffset: Long = -1L
+  val NoMetadata: String = ""
+  val InvalidTime: Long = -1L
+}
+
+case class OffsetMetadataAndError(offset: Long,
+                                  metadata: String = OffsetAndMetadata.NoMetadata,
+                                  error: Short = ErrorMapping.NoError) {
+
+  def this(offsetMetadata: OffsetAndMetadata, error: Short) =
+    this(offsetMetadata.offset, offsetMetadata.metadata, error)
 
-  override def toString = "OffsetAndMetadata[%d,%s,%d]".format(offset, metadata, error)
+  def this(error: Short) =
+    this(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, error)
 
+  def asTuple = (offset, metadata, error)
+
+  override def toString = "OffsetMetadataAndError[%d,%s,%d]".format(offset, metadata, error)
 }
 
 object OffsetMetadataAndError {
-  val InvalidOffset: Long = -1L;
-  val NoMetadata: String = "";
+  val NoOffset = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError)
+  val OffsetsLoading = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.OffsetsLoadInProgressCode)
+  val NotOffsetManagerForGroup = OffsetMetadataAndError(OffsetAndMetadata.InvalidOffset, OffsetAndMetadata.NoMetadata, ErrorMapping.NotCoordinatorForConsumerCode)
 }
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala b/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
new file mode 100644
index 0000000..1c8e96e
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetsLoadInProgressException.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * Indicates that offsets are currently being loaded from disk into the cache so offset fetch requests cannot be satisfied.
+ */
+class OffsetsLoadInProgressException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/common/Topic.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/common/Topic.scala b/core/src/main/scala/kafka/common/Topic.scala
index c1b9f65..ad75978 100644
--- a/core/src/main/scala/kafka/common/Topic.scala
+++ b/core/src/main/scala/kafka/common/Topic.scala
@@ -18,12 +18,16 @@
 package kafka.common
 
 import util.matching.Regex
+import kafka.server.OffsetManager
+
 
 object Topic {
   val legalChars = "[a-zA-Z0-9\\._\\-]"
   private val maxNameLength = 255
   private val rgx = new Regex(legalChars + "+")
 
+  val InternalTopics = Set(OffsetManager.OffsetsTopicName)
+
   def validate(topic: String) {
     if (topic.length <= 0)
       throw new InvalidTopicException("topic name is illegal, can't be empty")

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
index dc066c2..0f62819 100644
--- a/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
@@ -123,7 +123,13 @@ object ConsoleConsumer extends Logging {
       .withRequiredArg
       .describedAs("metrics dictory")
       .ofType(classOf[java.lang.String])
-
+    val includeInternalTopicsOpt = parser.accepts("include-internal-topics", "Allow consuming internal topics.")
+    val offsetsStorageOpt = parser.accepts("offsets-storage", "Specify offsets storage backend (kafka/zookeeper).")
+            .withRequiredArg
+            .describedAs("Offsets storage method.")
+            .ofType(classOf[String])
+            .defaultsTo("zookeeper")
+    val dualCommitEnabledOpt = parser.accepts("dual-commit-enabled", "If offsets storage is kafka and this is set, then commit to zookeeper as well.")
 
     val options: OptionSet = tryParse(parser, args)
     CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)
@@ -153,6 +159,7 @@ object ConsoleConsumer extends Logging {
       KafkaMetricsReporter.startReporters(verifiableProps)
     }
 
+    val offsetsStorage = options.valueOf(offsetsStorageOpt)
     val props = new Properties()
     props.put("group.id", options.valueOf(groupIdOpt))
     props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
@@ -166,6 +173,13 @@ object ConsoleConsumer extends Logging {
     props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
     props.put("consumer.timeout.ms", options.valueOf(consumerTimeoutMsOpt).toString)
     props.put("refresh.leader.backoff.ms", options.valueOf(refreshMetadataBackoffMsOpt).toString)
+    props.put("offsets.storage", offsetsStorage)
+    if (options.has(includeInternalTopicsOpt))
+      props.put("exclude.internal.topics", "false")
+    if (options.has(dualCommitEnabledOpt))
+      props.put("dual.commit.enabled", "true")
+    else
+      props.put("dual.commit.enabled", "false")
 
     val config = new ConsumerConfig(props)
     val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index e6875d6..1cf2f62 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -41,9 +41,14 @@ object ConsumerConfig extends Config {
   val MirrorTopicsWhitelist = ""
   val MirrorTopicsBlacklist = ""
   val MirrorConsumerNumThreads = 1
+  val OffsetsChannelBackoffMs = 1000
+  val OffsetsChannelSocketTimeoutMs = 10000
+  val OffsetsCommitMaxRetries = 5
+  val OffsetsStorage = "zookeeper"
 
   val MirrorTopicsWhitelistProp = "mirror.topics.whitelist"
   val MirrorTopicsBlacklistProp = "mirror.topics.blacklist"
+  val ExcludeInternalTopics = true
   val MirrorConsumerNumThreadsProp = "mirror.consumer.numthreads"
   val DefaultClientId = ""
 
@@ -51,6 +56,7 @@ object ConsumerConfig extends Config {
     validateClientId(config.clientId)
     validateGroupId(config.groupId)
     validateAutoOffsetReset(config.autoOffsetReset)
+    validateOffsetsStorage(config.offsetsStorage)
   }
 
   def validateClientId(clientId: String) {
@@ -69,6 +75,15 @@ object ConsumerConfig extends Config {
                                                  "Valid values are " + OffsetRequest.SmallestTimeString + " and " + OffsetRequest.LargestTimeString)
     }
   }
+
+  def validateOffsetsStorage(storage: String) {
+    storage match {
+      case "zookeeper" =>
+      case "kafka" =>
+      case _ => throw new InvalidConfigException("Wrong value " + storage + " of offsets.storage in consumer config; " +
+                                                 "Valid values are 'zookeeper' and 'kafka'")
+    }
+  }
 }
 
 class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(props) {
@@ -122,6 +137,27 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
   /** backoff time to refresh the leader of a partition after it loses the current leader */
   val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
 
+  /** backoff time to reconnect the offsets channel or to retry offset fetches/commits */
+  val offsetsChannelBackoffMs = props.getInt("offsets.channel.backoff.ms", OffsetsChannelBackoffMs)
+  /** socket timeout to use when reading responses for Offset Fetch/Commit requests. This timeout will also be used for
+   *  the ConsumerMetdata requests that are used to query for the offset coordinator. */
+  val offsetsChannelSocketTimeoutMs = props.getInt("offsets.channel.socket.timeout.ms", OffsetsChannelSocketTimeoutMs)
+
+  /** Retry the offset commit up to this many times on failure. This retry count only applies to offset commits during
+    * shut-down. It does not apply to commits from the auto-commit thread. It also does not apply to attempts to query
+    * for the offset coordinator before committing offsets. i.e., if a consumer metadata request fails for any reason,
+    * it is retried and that retry does not count toward this limit. */
+  val offsetsCommitMaxRetries = props.getInt("offsets.commit.max.retries", OffsetsCommitMaxRetries)
+
+  /** Specify whether offsets should be committed to "zookeeper" (default) or "kafka" */
+  val offsetsStorage = props.getString("offsets.storage", OffsetsStorage).toLowerCase
+
+  /** If you are using "kafka" as offsets.storage, you can dual commit offsets to ZooKeeper (in addition to Kafka). This
+    * is required during migration from zookeeper-based offset storage to kafka-based offset storage. With respect to any
+    * given consumer group, it is safe to turn this off after all instances within that group have been migrated to
+    * the new jar that commits offsets to the broker (instead of directly to ZooKeeper). */
+  val dualCommitEnabled = props.getBoolean("dual.commit.enabled", if (offsetsStorage == "kafka") true else false)
+
   /* what to do if an offset is out of range.
      smallest : automatically reset the offset to the smallest offset
      largest : automatically reset the offset to the largest offset
@@ -136,6 +172,9 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig(
    */
   val clientId = props.getString("client.id", groupId)
 
+  /** Whether messages from internal topics (such as offsets) should be exposed to the consumer. */
+  val excludeInternalTopics = props.getBoolean("exclude.internal.topics", ExcludeInternalTopics)
+
   validate(this)
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
index 13c3f77..07677c1 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
@@ -70,7 +70,7 @@ trait ConsumerConnector {
   /**
    *  Commit the offsets of all broker partitions connected by this connector.
    */
-  def commitOffsets
+  def commitOffsets(retryOnFailure: Boolean = true)
   
   /**
    *  Shut down the connector

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index fa7caa7..0e64632 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -92,6 +92,11 @@ class SimpleConsumer(val host: String,
     TopicMetadataResponse.readFrom(response.buffer)
   }
 
+  def send(request: ConsumerMetadataRequest): ConsumerMetadataResponse = {
+    val response = sendRequest(request)
+    ConsumerMetadataResponse.readFrom(response.buffer)
+  }
+
   /**
    *  Fetch a set of messages from a topic.
    *
@@ -126,7 +131,11 @@ class SimpleConsumer(val host: String,
    * @param request a [[kafka.api.OffsetCommitRequest]] object.
    * @return a [[kafka.api.OffsetCommitResponse]] object.
    */
-  def commitOffsets(request: OffsetCommitRequest) = OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+  def commitOffsets(request: OffsetCommitRequest) = {
+    // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before
+    // we can commit offsets.
+    OffsetCommitResponse.readFrom(sendRequest(request).buffer)
+  }
 
   /**
    * Fetch offsets for a topic

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/TopicCount.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicCount.scala b/core/src/main/scala/kafka/consumer/TopicCount.scala
index e332633..c793110 100644
--- a/core/src/main/scala/kafka/consumer/TopicCount.scala
+++ b/core/src/main/scala/kafka/consumer/TopicCount.scala
@@ -47,7 +47,7 @@ private[kafka] object TopicCount extends Logging {
   val blackListPattern = "black_list"
   val staticPattern = "static"
 
-  def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient) : TopicCount = {
+  def constructTopicCount(group: String, consumerId: String, zkClient: ZkClient, excludeInternalTopics: Boolean) : TopicCount = {
     val dirs = new ZKGroupDirs(group)
     val topicCountString = ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1
     var subscriptionPattern: String = null
@@ -85,15 +85,15 @@ private[kafka] object TopicCount extends Logging {
           new Whitelist(regex)
         else
           new Blacklist(regex)
-      new WildcardTopicCount(zkClient, consumerId, filter, numStreams)
+      new WildcardTopicCount(zkClient, consumerId, filter, numStreams, excludeInternalTopics)
     }
   }
 
   def constructTopicCount(consumerIdString: String, topicCount: Map[String, Int]) =
     new StaticTopicCount(consumerIdString, topicCount)
 
-  def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient) =
-    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams)
+  def constructTopicCount(consumerIdString: String, filter: TopicFilter, numStreams: Int, zkClient: ZkClient, excludeInternalTopics: Boolean) =
+    new WildcardTopicCount(zkClient, consumerIdString, filter, numStreams, excludeInternalTopics)
 
 }
 
@@ -119,9 +119,11 @@ private[kafka] class StaticTopicCount(val consumerIdString: String,
 private[kafka] class WildcardTopicCount(zkClient: ZkClient,
                                         consumerIdString: String,
                                         topicFilter: TopicFilter,
-                                        numStreams: Int) extends TopicCount {
+                                        numStreams: Int,
+                                        excludeInternalTopics: Boolean) extends TopicCount {
   def getConsumerThreadIdsPerTopic = {
-    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath).filter(topicFilter.isTopicAllowed(_))
+    val wildcardTopics = ZkUtils.getChildrenParentMayNotExist(zkClient, ZkUtils.BrokerTopicsPath)
+                         .filter(topic => topicFilter.isTopicAllowed(topic, excludeInternalTopics))
     makeConsumerThreadIdsPerTopic(consumerIdString, Map(wildcardTopics.map((_, numStreams)): _*))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/TopicFilter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/TopicFilter.scala b/core/src/main/scala/kafka/consumer/TopicFilter.scala
index 4f20823..5a13540 100644
--- a/core/src/main/scala/kafka/consumer/TopicFilter.scala
+++ b/core/src/main/scala/kafka/consumer/TopicFilter.scala
@@ -20,6 +20,7 @@ package kafka.consumer
 
 import kafka.utils.Logging
 import java.util.regex.{PatternSyntaxException, Pattern}
+import kafka.common.Topic
 
 
 sealed abstract class TopicFilter(rawRegex: String) extends Logging {
@@ -41,12 +42,12 @@ sealed abstract class TopicFilter(rawRegex: String) extends Logging {
 
   override def toString = regex
 
-  def isTopicAllowed(topic: String): Boolean
+  def isTopicAllowed(topic: String, excludeInternalTopics: Boolean): Boolean
 }
 
 case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
-  override def isTopicAllowed(topic: String) = {
-    val allowed = topic.matches(regex)
+  override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
+    val allowed = topic.matches(regex) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))
@@ -58,8 +59,8 @@ case class Whitelist(rawRegex: String) extends TopicFilter(rawRegex) {
 }
 
 case class Blacklist(rawRegex: String) extends TopicFilter(rawRegex) {
-  override def isTopicAllowed(topic: String) = {
-    val allowed = !topic.matches(regex)
+  override def isTopicAllowed(topic: String, excludeInternalTopics: Boolean) = {
+    val allowed = (!topic.matches(regex)) && !(Topic.InternalTopics.contains(topic) && excludeInternalTopics)
 
     debug("%s %s".format(
       topic, if (allowed) "allowed" else "filtered"))

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index 703b2e2..9a3db90 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -34,7 +34,11 @@ import kafka.utils.Utils.inLock
 import kafka.common._
 import com.yammer.metrics.core.Gauge
 import kafka.metrics._
+import kafka.network.BlockingChannel
+import kafka.client.ClientUtils
+import kafka.api._
 import scala.Some
+import kafka.common.TopicAndPartition
 
 
 /**
@@ -85,7 +89,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-  private var checkpointedOffsets = new Pool[TopicAndPartition, Long]
+  private val checkpointedOffsets = new Pool[TopicAndPartition, Long]
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
   private val messageStreamCreated = new AtomicBoolean(false)
@@ -94,8 +98,15 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   private var topicPartitionChangeListener: ZKTopicPartitionChangeListener = null
   private var loadBalancerListener: ZKRebalancerListener = null
 
+  private var offsetsChannel: BlockingChannel = null
+  private val offsetsChannelLock = new Object
+
   private var wildcardTopicWatcher: ZookeeperTopicEventWatcher = null
 
+  // useful for tracking migration of consumers to store offsets in kafka
+  private val kafkaCommitMeter = newMeter(config.clientId + "-KafkaCommitsPerSec", "commits", TimeUnit.SECONDS)
+  private val zkCommitMeter = newMeter(config.clientId + "-ZooKeeperCommitsPerSec", "commits", TimeUnit.SECONDS)
+
   val consumerIdString = {
     var consumerUuid : String = null
     config.consumerId match {
@@ -113,6 +124,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
   connectZk()
   createFetcher()
+  ensureOffsetManagerConnected()
+
   if (config.autoCommitEnable) {
     scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
@@ -156,12 +169,22 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     zkClient = new ZkClient(config.zkConnect, config.zkSessionTimeoutMs, config.zkConnectionTimeoutMs, ZKStringSerializer)
   }
 
+  // Blocks until the offset manager is located and a channel is established to it.
+  private def ensureOffsetManagerConnected() {
+    if (config.offsetsStorage == "kafka") {
+      if (offsetsChannel == null || !offsetsChannel.isConnected)
+        offsetsChannel = ClientUtils.channelToOffsetManager(config.groupId, zkClient, config.offsetsChannelSocketTimeoutMs, config.offsetsChannelBackoffMs)
+
+      debug("Connected to offset manager %s:%d.".format(offsetsChannel.host, offsetsChannel.port))
+    }
+  }
+
   def shutdown() {
-    rebalanceLock synchronized {
-      val canShutdown = isShuttingDown.compareAndSet(false, true);
-      if (canShutdown) {
-        info("ZKConsumerConnector shutting down")
+    val canShutdown = isShuttingDown.compareAndSet(false, true)
+    if (canShutdown) {
+      info("ZKConsumerConnector shutting down")
 
+      rebalanceLock synchronized {
         if (wildcardTopicWatcher != null)
           wildcardTopicWatcher.shutdown()
         try {
@@ -178,6 +201,8 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
             zkClient.close()
             zkClient = null
           }
+
+          if (offsetsChannel != null) offsetsChannel.disconnect()
         } catch {
           case e: Throwable =>
             fatal("error during consumer connector shutdown", e)
@@ -240,7 +265,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
   def autoCommit() {
     trace("auto committing")
     try {
-      commitOffsets()
+      commitOffsets(isAutoCommit = false)
     }
     catch {
       case t: Throwable =>
@@ -249,30 +274,184 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
   }
 
-  def commitOffsets() {
-    if (zkClient == null) {
-      error("zk client is null. Cannot commit offsets")
-      return
+  def commitOffsetToZooKeeper(topicPartition: TopicAndPartition, offset: Long) {
+    val topicDirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+    updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + topicPartition.partition, offset.toString)
+    zkCommitMeter.mark()
+  }
+
+  def commitOffsets(isAutoCommit: Boolean = true) {
+    var retriesRemaining = 1 + (if (isAutoCommit) config.offsetsCommitMaxRetries else 0) // no retries for commits from auto-commit
+    var done = false
+
+    while (!done) {
+      val committed = offsetsChannelLock synchronized { // committed when we receive either no error codes or only MetadataTooLarge errors
+        val offsetsToCommit = mutable.Map(topicRegistry.flatMap { case (topic, partitionTopicInfos) =>
+          partitionTopicInfos.filterNot { case (partition, info) =>
+            val newOffset = info.getConsumeOffset()
+            newOffset == checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))
+          }.map { case (partition, info) =>
+            TopicAndPartition(info.topic, info.partitionId) -> OffsetAndMetadata(info.getConsumeOffset())
+          }
+        }.toSeq:_*)
+
+        if (offsetsToCommit.size > 0) {
+          if (config.offsetsStorage == "zookeeper") {
+            offsetsToCommit.foreach { case(topicAndPartition, offsetAndMetadata) =>
+              commitOffsetToZooKeeper(topicAndPartition, offsetAndMetadata.offset)
+              checkpointedOffsets.put(topicAndPartition, offsetAndMetadata.offset)
+            }
+            true
+          } else {
+            val offsetCommitRequest = OffsetCommitRequest(config.groupId, offsetsToCommit, clientId = config.clientId)
+            ensureOffsetManagerConnected()
+            try {
+              kafkaCommitMeter.mark(offsetsToCommit.size)
+              offsetsChannel.send(offsetCommitRequest)
+              val offsetCommitResponse = OffsetCommitResponse.readFrom(offsetsChannel.receive().buffer)
+              trace("Offset commit response: %s.".format(offsetCommitResponse))
+
+              val (commitFailed, retryableIfFailed, shouldRefreshCoordinator, errorCount) = {
+                offsetCommitResponse.commitStatus.foldLeft(false, false, false, 0) { case(folded, (topicPartition, errorCode)) =>
+
+                  if (errorCode == ErrorMapping.NoError) {
+                    val offset = offsetsToCommit(topicPartition).offset
+                    checkpointedOffsets.put(topicPartition, offset)
+                    if (config.dualCommitEnabled) {
+                      commitOffsetToZooKeeper(topicPartition, offset)
+                    }
+                  }
+
+                  (folded._1 || // update commitFailed
+                     errorCode != ErrorMapping.NoError,
+
+                  folded._2 || // update retryableIfFailed - (only metadata too large is not retryable)
+                    (errorCode != ErrorMapping.NoError && errorCode != ErrorMapping.OffsetMetadataTooLargeCode),
+
+                  folded._3 || // update shouldRefreshCoordinator
+                    errorCode == ErrorMapping.NotCoordinatorForConsumerCode ||
+                    errorCode == ErrorMapping.ConsumerCoordinatorNotAvailableCode,
+
+                  // update error count
+                  folded._4 + (if (errorCode != ErrorMapping.NoError) 1 else 0))
+                }
+              }
+              debug(errorCount + " errors in offset commit response.")
+
+
+              if (shouldRefreshCoordinator) {
+                debug("Could not commit offsets (because offset coordinator has moved or is unavailable).")
+                offsetsChannel.disconnect()
+              }
+
+              if (commitFailed && retryableIfFailed)
+                false
+              else
+                true
+            }
+            catch {
+              case t: Throwable =>
+                error("Error while committing offsets.", t)
+                offsetsChannel.disconnect()
+                false
+            }
+          }
+        } else {
+          debug("No updates to offsets since last commit.")
+          true
+        }
+      }
+
+      done = if (isShuttingDown.get() && isAutoCommit) { // should not retry indefinitely if shutting down
+        retriesRemaining -= 1
+        retriesRemaining == 0 || committed
+      } else
+        true
+
+      if (!done) {
+        debug("Retrying offset commit in %d ms".format(config.offsetsChannelBackoffMs))
+        Thread.sleep(config.offsetsChannelBackoffMs)
+      }
+    }
+  }
+
+  private def fetchOffsetFromZooKeeper(topicPartition: TopicAndPartition) = {
+    val dirs = new ZKGroupTopicDirs(config.groupId, topicPartition.topic)
+    val offsetString = readDataMaybeNull(zkClient, dirs.consumerOffsetDir + "/" + topicPartition.partition)._1
+    offsetString match {
+      case Some(offsetStr) => (topicPartition, OffsetMetadataAndError(offsetStr.toLong, OffsetAndMetadata.NoMetadata, ErrorMapping.NoError))
+      case None => (topicPartition, OffsetMetadataAndError.NoOffset)
     }
-    for ((topic, infos) <- topicRegistry) {
-      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      for (info <- infos.values) {
-        val newOffset = info.getConsumeOffset
-        if (newOffset != checkpointedOffsets.get(TopicAndPartition(topic, info.partitionId))) {
+  }
+
+  private def fetchOffsets(partitions: Seq[TopicAndPartition]) = {
+    if (partitions.isEmpty)
+      Some(OffsetFetchResponse(Map.empty))
+    else if (config.offsetsStorage == "zookeeper") {
+      val offsets = partitions.map(fetchOffsetFromZooKeeper)
+      Some(OffsetFetchResponse(immutable.Map(offsets:_*)))
+    } else {
+      val offsetFetchRequest = OffsetFetchRequest(groupId = config.groupId, requestInfo = partitions, clientId = config.clientId)
+
+      var offsetFetchResponseOpt: Option[OffsetFetchResponse] = None
+      while (!isShuttingDown.get && !offsetFetchResponseOpt.isDefined) {
+        offsetFetchResponseOpt = offsetsChannelLock synchronized {
+          ensureOffsetManagerConnected()
           try {
-            updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + info.partitionId, newOffset.toString)
-            checkpointedOffsets.put(TopicAndPartition(topic, info.partitionId), newOffset)
-          } catch {
-            case t: Throwable =>
-              // log it and let it go
-              warn("exception during commitOffsets",  t)
+            offsetsChannel.send(offsetFetchRequest)
+            val offsetFetchResponse = OffsetFetchResponse.readFrom(offsetsChannel.receive().buffer)
+            trace("Offset fetch response: %s.".format(offsetFetchResponse))
+
+            val (leaderChanged, loadInProgress) =
+              offsetFetchResponse.requestInfo.foldLeft(false, false) { case(folded, (topicPartition, offsetMetadataAndError)) =>
+                (folded._1 || (offsetMetadataAndError.error == ErrorMapping.NotCoordinatorForConsumerCode),
+                 folded._2 || (offsetMetadataAndError.error == ErrorMapping.OffsetsLoadInProgressCode))
+              }
+
+            if (leaderChanged) {
+              offsetsChannel.disconnect()
+              debug("Could not fetch offsets (because offset manager has moved).")
+              None // retry
+            }
+            else if (loadInProgress) {
+              debug("Could not fetch offsets (because offset cache is being loaded).")
+              None // retry
+            }
+            else {
+              if (config.dualCommitEnabled) {
+                // if dual-commit is enabled (i.e., if a consumer group is migrating offsets to kafka), then pick the
+                // maximum between offsets in zookeeper and kafka.
+                val kafkaOffsets = offsetFetchResponse.requestInfo
+                val mostRecentOffsets = kafkaOffsets.map { case (topicPartition, kafkaOffset) =>
+                  val zkOffset = fetchOffsetFromZooKeeper(topicPartition)._2.offset
+                  val mostRecentOffset = zkOffset.max(kafkaOffset.offset)
+                  (topicPartition, OffsetMetadataAndError(mostRecentOffset, kafkaOffset.metadata, ErrorMapping.NoError))
+                }
+                Some(OffsetFetchResponse(mostRecentOffsets))
+              }
+              else
+                Some(offsetFetchResponse)
+            }
+          }
+          catch {
+            case e: Exception =>
+              error("Error while fetching offsets from %s:%d.".format(offsetsChannel.host, offsetsChannel.port), e)
+              offsetsChannel.disconnect()
+              None // retry
           }
-          debug("Committed offset " + newOffset + " for topic " + info)
+        }
+
+        if (offsetFetchResponseOpt.isEmpty) {
+          debug("Retrying offset fetch in %d ms".format(config.offsetsChannelBackoffMs))
+          Thread.sleep(config.offsetsChannelBackoffMs)
         }
       }
+
+      offsetFetchResponseOpt
     }
   }
 
+
   class ZKSessionExpireListener(val dirs: ZKGroupDirs,
                                  val consumerIdString: String,
                                  val topicCount: TopicCount,
@@ -433,8 +612,9 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def rebalance(cluster: Cluster): Boolean = {
-      val myTopicThreadIdsMap = TopicCount.constructTopicCount(group, consumerIdString, zkClient).getConsumerThreadIdsPerTopic
-      val consumersPerTopicMap = getConsumersPerTopic(zkClient, group)
+      val myTopicThreadIdsMap = TopicCount.constructTopicCount(
+        group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic
+      val consumersPerTopicMap = getConsumersPerTopic(zkClient, group, config.excludeInternalTopics)
       val brokers = getAllBrokersInCluster(zkClient)
       if (brokers.size == 0) {
         // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started.
@@ -458,13 +638,12 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
 
         releasePartitionOwnership(topicRegistry)
 
-        var partitionOwnershipDecision = new collection.mutable.HashMap[(String, Int), String]()
+        var partitionOwnershipDecision = new collection.mutable.HashMap[TopicAndPartition, String]()
         val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
 
         for ((topic, consumerThreadIdSet) <- myTopicThreadIdsMap) {
           currentTopicRegistry.put(topic, new Pool[Int, PartitionTopicInfo])
 
-          val topicDirs = new ZKGroupTopicDirs(group, topic)
           val curConsumers = consumersPerTopicMap.get(topic).get
           val curPartitions: Seq[Int] = partitionsPerTopicMap.get(topic).get
 
@@ -490,27 +669,42 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
               for (i <- startPart until startPart + nParts) {
                 val partition = curPartitions(i)
                 info(consumerThreadId + " attempting to claim partition " + partition)
-                addPartitionTopicInfo(currentTopicRegistry, topicDirs, partition, topic, consumerThreadId)
                 // record the partition ownership decision
-                partitionOwnershipDecision += ((topic, partition) -> consumerThreadId)
+                partitionOwnershipDecision += (TopicAndPartition(topic, partition) -> consumerThreadId)
               }
             }
           }
         }
 
-        /**
-         * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
-         * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
-         */
-        if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
-          info("Updating the cache")
-          debug("Partitions per topic cache " + partitionsPerTopicMap)
-          debug("Consumers per topic cache " + consumersPerTopicMap)
-          topicRegistry = currentTopicRegistry
-          updateFetcher(cluster)
-          true
-        } else {
+        // fetch current offsets for all topic-partitions
+        val topicPartitions = partitionOwnershipDecision.keySet.toSeq
+        val offsetFetchResponseOpt = fetchOffsets(topicPartitions)
+
+        if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined)
           false
+        else {
+          val offsetFetchResponse = offsetFetchResponseOpt.get
+          topicPartitions.foreach { topicAndPartition =>
+            val (topic, partition) = topicAndPartition.asTuple
+            val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset
+            val threadId = partitionOwnershipDecision(topicAndPartition)
+            addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId)
+          }
+
+          /**
+           * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt
+           * A rebalancing attempt is completed successfully only after the fetchers have been started correctly
+           */
+          if(reflectPartitionOwnershipDecision(partitionOwnershipDecision.toMap)) {
+            info("Updating the cache")
+            debug("Partitions per topic cache " + partitionsPerTopicMap)
+            debug("Consumers per topic cache " + consumersPerTopicMap)
+            topicRegistry = currentTopicRegistry
+            updateFetcher(cluster)
+            true
+          } else {
+            false
+          }
         }
       }
     }
@@ -533,7 +727,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
           * successfully and the fetchers restart to fetch more data chunks
           **/
         if (config.autoCommitEnable)
-          commitOffsets
+          commitOffsets()
         case None =>
       }
     }
@@ -578,11 +772,11 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
       }
     }
 
-    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[(String, Int), String]): Boolean = {
+    private def reflectPartitionOwnershipDecision(partitionOwnershipDecision: Map[TopicAndPartition, String]): Boolean = {
       var successfullyOwnedPartitions : List[(String, Int)] = Nil
       val partitionOwnershipSuccessful = partitionOwnershipDecision.map { partitionOwner =>
-        val topic = partitionOwner._1._1
-        val partition = partitionOwner._1._2
+        val topic = partitionOwner._1.topic
+        val partition = partitionOwner._1.partition
         val consumerThreadId = partitionOwner._2
         val partitionOwnerPath = getConsumerPartitionOwnerPath(group, topic, partition)
         try {
@@ -609,18 +803,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     }
 
     private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]],
-                                      topicDirs: ZKGroupTopicDirs, partition: Int,
-                                      topic: String, consumerThreadId: String) {
+                                      partition: Int, topic: String,
+                                      offset: Long, consumerThreadId: String) {
       val partTopicInfoMap = currentTopicRegistry.get(topic)
 
-      val znode = topicDirs.consumerOffsetDir + "/" + partition
-      val offsetString = readDataMaybeNull(zkClient, znode)._1
-      // If first time starting a consumer, set the initial offset to -1
-      val offset =
-        offsetString match {
-          case Some(offsetStr) => offsetStr.toLong
-          case None => PartitionTopicInfo.InvalidOffset
-        }
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)
@@ -746,10 +932,10 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
      // bootstrap with existing topics
     private var wildcardTopics =
       getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
-        .filter(topicFilter.isTopicAllowed)
+        .filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
 
     private val wildcardTopicCount = TopicCount.constructTopicCount(
-      consumerIdString, topicFilter, numStreams, zkClient)
+      consumerIdString, topicFilter, numStreams, zkClient, config.excludeInternalTopics)
 
     val dirs = new ZKGroupDirs(config.groupId)
     registerConsumerInZK(dirs, consumerIdString, wildcardTopicCount)
@@ -764,7 +950,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     def handleTopicEvent(allTopics: Seq[String]) {
       debug("Handling topic event")
 
-      val updatedTopics = allTopics.filter(topicFilter.isTopicAllowed)
+      val updatedTopics = allTopics.filter(topic => topicFilter.isTopicAllowed(topic, config.excludeInternalTopics))
 
       val addedTopics = updatedTopics filterNot (wildcardTopics contains)
       if (addedTopics.nonEmpty)

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 4deff9d..5db24a7 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -604,7 +604,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
     }
   }
 
-  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = true) {
+  def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
     info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
     try {
       controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
@@ -1116,7 +1116,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
                       controllerContext.partitionsUndergoingPreferredReplicaElection.size == 0 &&
                       !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) &&
                       controllerContext.allTopics.contains(topicPartition.topic)) {
-                    onPreferredReplicaElection(Set(topicPartition), false)
+                    onPreferredReplicaElection(Set(topicPartition), true)
                   }
                 }
               }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
new file mode 100644
index 0000000..dfa9c42
--- /dev/null
+++ b/core/src/main/scala/kafka/javaapi/ConsumerMetadataResponse.scala
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.javaapi
+
+import kafka.cluster.Broker
+
+class ConsumerMetadataResponse(private val underlying: kafka.api.ConsumerMetadataResponse) {
+
+  def errorCode = underlying.errorCode
+
+  def coordinator: Broker = {
+    import kafka.javaapi.Implicits._
+    underlying.coordinator
+  }
+
+  override def equals(other: Any) = canEqual(other) && {
+    val otherConsumerMetadataResponse = other.asInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
+    this.underlying.equals(otherConsumerMetadataResponse.underlying)
+  }
+
+  def canEqual(other: Any) = other.isInstanceOf[kafka.javaapi.ConsumerMetadataResponse]
+
+  override def hashCode = underlying.hashCode
+
+  override def toString = underlying.toString
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
index 57b9d2a..6de320d 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitRequest.scala
@@ -17,19 +17,18 @@
 
 package kafka.javaapi
 
-import kafka.common.{TopicAndPartition, OffsetMetadataAndError}
-import collection.JavaConversions
-import java.nio.ByteBuffer
+import kafka.common.{OffsetAndMetadata, TopicAndPartition}
 
 class OffsetCommitRequest(groupId: String,
-                          requestInfo: java.util.Map[TopicAndPartition, OffsetMetadataAndError],
+                          requestInfo: java.util.Map[TopicAndPartition, OffsetAndMetadata],
                           versionId: Short,
                           correlationId: Int,
                           clientId: String) {
   val underlying = {
-    val scalaMap: Map[TopicAndPartition, OffsetMetadataAndError] = {
-      import JavaConversions._
-      requestInfo.toMap
+    val scalaMap: collection.mutable.Map[TopicAndPartition, OffsetAndMetadata] = {
+      import collection.JavaConversions._
+
+      collection.JavaConversions.asMap(requestInfo)
     }
     kafka.api.OffsetCommitRequest(
       groupId = groupId,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
index 570bf31..c2d3d11 100644
--- a/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
+++ b/core/src/main/scala/kafka/javaapi/OffsetCommitResponse.scala
@@ -24,7 +24,7 @@ class OffsetCommitResponse(private val underlying: kafka.api.OffsetCommitRespons
 
   def errors: java.util.Map[TopicAndPartition, Short] = {
     import JavaConversions._
-    underlying.requestInfo
+    underlying.commitStatus
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
index c45c803..44d3d35 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
+++ b/core/src/main/scala/kafka/javaapi/consumer/ConsumerConnector.java
@@ -18,13 +18,12 @@
 package kafka.javaapi.consumer;
 
 
+import java.util.List;
+import java.util.Map;
 import kafka.consumer.KafkaStream;
 import kafka.consumer.TopicFilter;
 import kafka.serializer.Decoder;
 
-import java.util.List;
-import java.util.Map;
-
 public interface ConsumerConnector {
   /**
    *  Create a list of MessageStreams of type T for each topic.
@@ -62,6 +61,7 @@ public interface ConsumerConnector {
    *  Commit the offsets of all broker partitions connected by this connector.
    */
   public void commitOffsets();
+  public void commitOffsets(boolean retryOnFailure);
 
   /**
    *  Shut down the connector

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
index 58e83f6..1f95d9b 100644
--- a/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/javaapi/consumer/ZookeeperConsumerConnector.scala
@@ -101,9 +101,13 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
     
   def createMessageStreamsByFilter(topicFilter: TopicFilter) = 
     createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder())
-    
+
   def commitOffsets() {
-    underlying.commitOffsets
+    underlying.commitOffsets()
+  }
+
+  def commitOffsets(retryOnFailure: Boolean) {
+    underlying.commitOffsets(retryOnFailure)
   }
 
   def shutdown() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a670537a/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index e1f8b97..b2652dd 100644
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -255,8 +255,8 @@ class FileMessageSet private[kafka](@volatile var file: File,
   /**
    * Read from the underlying file into the buffer starting at the given position
    */
-  def readInto(buffer: ByteBuffer, position: Int): ByteBuffer = {
-    channel.read(buffer, position)
+  def readInto(buffer: ByteBuffer, relativePosition: Int): ByteBuffer = {
+    channel.read(buffer, relativePosition + this.start)
     buffer.flip()
     buffer
   }