You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/06/22 00:19:28 UTC

[kafka] branch trunk updated: KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 418a91b  KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)
418a91b is described below

commit 418a91b5d4e3a0579b91d286f61c2b63c5b4a9b6
Author: Vahid Hashemian <va...@us.ibm.com>
AuthorDate: Thu Jun 21 17:19:24 2018 -0700

    KAFKA-4682; Revise expiration semantics of consumer group offsets (KIP-211 - Part 1) (#4896)
    
    This patch contains the improved offset expiration semantics proposed in KIP-211. Committed offsets will not be expired as long as a group is active. Once all members have left the group, then offsets will be expired after the timeout configured by `offsets.retention.minutes`. Note that the optimization for early expiration of unsubscribed topics will be implemented in a separate patch.
---
 .../consumer/internals/ConsumerCoordinator.java    |   3 +-
 .../kafka/common/requests/OffsetCommitRequest.java |  20 +-
 .../common/requests/OffsetCommitResponse.java      |   4 +-
 .../kafka/common/requests/RequestResponseTest.java |  14 +-
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 .../kafka/common/OffsetMetadataAndError.scala      |   6 +-
 .../kafka/coordinator/group/GroupCoordinator.scala |   6 +-
 .../kafka/coordinator/group/GroupMetadata.scala    |  62 +++-
 .../coordinator/group/GroupMetadataManager.scala   | 151 ++++++---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  30 +-
 .../main/scala/kafka/tools/ConsoleConsumer.scala   |   4 +
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../test/scala/unit/kafka/api/ApiVersionTest.scala |   3 +
 .../group/GroupMetadataManagerTest.scala           | 358 ++++++++++++++++++---
 .../coordinator/group/GroupMetadataTest.scala      |   5 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 docs/upgrade.html                                  |  19 +-
 18 files changed, 545 insertions(+), 159 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 9c19af1..b484e11 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -798,8 +798,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(this.groupId, offsetData).
                 setGenerationId(generation.generationId).
-                setMemberId(generation.memberId).
-                setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
+                setMemberId(generation.memberId);
 
         log.trace("Sending OffsetCommit request with {} to coordinator {}", offsets, coordinator);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
index 570c4d5..8a51e84 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
@@ -111,9 +111,15 @@ public class OffsetCommitRequest extends AbstractRequest {
      */
     private static final Schema OFFSET_COMMIT_REQUEST_V4 = OFFSET_COMMIT_REQUEST_V3;
 
+    private static final Schema OFFSET_COMMIT_REQUEST_V5 = new Schema(
+            GROUP_ID,
+            GENERATION_ID,
+            MEMBER_ID,
+            new Field(TOPICS_KEY_NAME, new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets."));
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2,
-            OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4};
+            OFFSET_COMMIT_REQUEST_V3, OFFSET_COMMIT_REQUEST_V4, OFFSET_COMMIT_REQUEST_V5};
     }
 
     // default values for the current version
@@ -166,7 +172,6 @@ public class OffsetCommitRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionData> offsetData;
         private String memberId = DEFAULT_MEMBER_ID;
         private int generationId = DEFAULT_GENERATION_ID;
-        private long retentionTime = DEFAULT_RETENTION_TIME;
 
         public Builder(String groupId, Map<TopicPartition, PartitionData> offsetData) {
             super(ApiKeys.OFFSET_COMMIT);
@@ -184,11 +189,6 @@ public class OffsetCommitRequest extends AbstractRequest {
             return this;
         }
 
-        public Builder setRetentionTime(long retentionTime) {
-            this.retentionTime = retentionTime;
-            return this;
-        }
-
         @Override
         public OffsetCommitRequest build(short version) {
             switch (version) {
@@ -199,8 +199,8 @@ public class OffsetCommitRequest extends AbstractRequest {
                 case 2:
                 case 3:
                 case 4:
-                    long retentionTime = version == 1 ? DEFAULT_RETENTION_TIME : this.retentionTime;
-                    return new OffsetCommitRequest(groupId, generationId, memberId, retentionTime, offsetData, version);
+                case 5:
+                    return new OffsetCommitRequest(groupId, generationId, memberId, DEFAULT_RETENTION_TIME, offsetData, version);
                 default:
                     throw new UnsupportedVersionException("Unsupported version " + version);
             }
@@ -213,7 +213,6 @@ public class OffsetCommitRequest extends AbstractRequest {
                 append(", groupId=").append(groupId).
                 append(", memberId=").append(memberId).
                 append(", generationId=").append(generationId).
-                append(", retentionTime=").append(retentionTime).
                 append(", offsetData=").append(offsetData).
                 append(")");
             return bld.toString();
@@ -316,6 +315,7 @@ public class OffsetCommitRequest extends AbstractRequest {
                 return new OffsetCommitResponse(responseData);
             case 3:
             case 4:
+            case 5:
                 return new OffsetCommitResponse(throttleTimeMs, responseData);
             default:
                 throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d",
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index 0b0b228..c79bc57 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -85,9 +85,11 @@ public class OffsetCommitResponse extends AbstractResponse {
      */
     private static final Schema OFFSET_COMMIT_RESPONSE_V4 = OFFSET_COMMIT_RESPONSE_V3;
 
+    private static final Schema OFFSET_COMMIT_RESPONSE_V5 = OFFSET_COMMIT_RESPONSE_V4;
+
     public static Schema[] schemaVersions() {
         return new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2,
-            OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4};
+            OFFSET_COMMIT_RESPONSE_V3, OFFSET_COMMIT_RESPONSE_V4, OFFSET_COMMIT_RESPONSE_V5};
     }
 
     private final Map<TopicPartition, Errors> responseData;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 6e705d2..da61398 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -141,9 +141,6 @@ public class RequestResponseTest {
         checkErrorResponse(createMetadataRequest(3, singletonList("topic1")), new UnknownServerException());
         checkResponse(createMetadataResponse(), 4);
         checkErrorResponse(createMetadataRequest(4, singletonList("topic1")), new UnknownServerException());
-        checkRequest(createOffsetCommitRequest(2));
-        checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
-        checkResponse(createOffsetCommitResponse(), 0);
         checkRequest(OffsetFetchRequest.forAllPartitions("group1"));
         checkErrorResponse(OffsetFetchRequest.forAllPartitions("group1"), new NotCoordinatorException("Not Coordinator"));
         checkRequest(createOffsetFetchRequest(0));
@@ -210,6 +207,16 @@ public class RequestResponseTest {
         checkErrorResponse(createOffsetCommitRequest(0), new UnknownServerException());
         checkRequest(createOffsetCommitRequest(1));
         checkErrorResponse(createOffsetCommitRequest(1), new UnknownServerException());
+        checkRequest(createOffsetCommitRequest(2));
+        checkErrorResponse(createOffsetCommitRequest(2), new UnknownServerException());
+        checkRequest(createOffsetCommitRequest(3));
+        checkErrorResponse(createOffsetCommitRequest(3), new UnknownServerException());
+        checkRequest(createOffsetCommitRequest(4));
+        checkErrorResponse(createOffsetCommitRequest(4), new UnknownServerException());
+        checkResponse(createOffsetCommitResponse(), 4);
+        checkRequest(createOffsetCommitRequest(5));
+        checkErrorResponse(createOffsetCommitRequest(5), new UnknownServerException());
+        checkResponse(createOffsetCommitResponse(), 5);
         checkRequest(createJoinGroupRequest(0));
         checkRequest(createUpdateMetadataRequest(0, null));
         checkErrorResponse(createUpdateMetadataRequest(0, null), new UnknownServerException());
@@ -817,7 +824,6 @@ public class RequestResponseTest {
         return new OffsetCommitRequest.Builder("group1", commitData)
                 .setGenerationId(100)
                 .setMemberId("consumer1")
-                .setRetentionTime(1000000)
                 .build((short) version);
     }
 
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 9ed6432..485f2bd 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -73,7 +73,9 @@ object ApiVersion {
     // Introduced OffsetsForLeaderEpochRequest V1 via KIP-279
     KAFKA_2_0_IV0,
     // Introduced ApiVersionsRequest V2 via KIP-219
-    KAFKA_2_0_IV1
+    KAFKA_2_0_IV1,
+    // Introduced new schemas for group offset (v2) and group metadata (v2) (KIP-211)
+    KAFKA_2_1_IV0
   )
 
   // Map keys are the union of the short and full versions
@@ -248,4 +250,11 @@ case object KAFKA_2_0_IV1 extends DefaultApiVersion {
   val subVersion = "IV1"
   val recordVersion = RecordVersion.V2
   val id: Int = 16
+}
+
+case object KAFKA_2_1_IV0 extends DefaultApiVersion {
+  val shortVersion: String = "2.1"
+  val subVersion = "IV0"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 18
 }
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
index 2cf9bb4..afe542c 100644
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
@@ -34,17 +34,17 @@ object OffsetMetadata {
 
 case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
                              commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
-                             expireTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP) {
+                             expireTimestamp: Option[Long] = None) {
 
   def offset = offsetMetadata.offset
 
   def metadata = offsetMetadata.metadata
 
-  override def toString = "[%s,CommitTime %d,ExpirationTime %d]".format(offsetMetadata, commitTimestamp, expireTimestamp)
+  override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
 }
 
 object OffsetAndMetadata {
-  def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, expireTimestamp)
+  def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
 
   def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 9748e17..2cedacd 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -125,7 +125,7 @@ class GroupCoordinator(val brokerId: Int,
           if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
             responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
           } else {
-            val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+            val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
             doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
           }
 
@@ -451,7 +451,7 @@ class GroupCoordinator(val brokerId: Int,
       case Some(error) => responseCallback(offsetMetadata.mapValues(_ => error))
       case None =>
         val group = groupManager.getGroup(groupId).getOrElse {
-          groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+          groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
         }
         doCommitOffsets(group, NoMemberId, NoGeneration, producerId, producerEpoch, offsetMetadata, responseCallback)
     }
@@ -469,7 +469,7 @@ class GroupCoordinator(val brokerId: Int,
           case None =>
             if (generationId < 0) {
               // the group is not relying on Kafka for group management, so allow the commit
-              val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
+              val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
               doCommitOffsets(group, memberId, generationId, NO_PRODUCER_ID, NO_PRODUCER_EPOCH,
                 offsetMetadata, responseCallback)
             } else {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 2b9c91f..d729449 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock
 import kafka.common.OffsetAndMetadata
 import kafka.utils.{CoreUtils, Logging, nonthreadsafe}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Time
 
 import scala.collection.{Seq, immutable, mutable}
 
@@ -118,12 +119,15 @@ private object GroupMetadata {
                 protocolType: String,
                 protocol: String,
                 leaderId: String,
-                members: Iterable[MemberMetadata]): GroupMetadata = {
-    val group = new GroupMetadata(groupId, initialState)
+                currentStateTimestamp: Option[Long],
+                members: Iterable[MemberMetadata],
+                time: Time): GroupMetadata = {
+    val group = new GroupMetadata(groupId, initialState, time)
     group.generationId = generationId
     group.protocolType = if (protocolType == null || protocolType.isEmpty) None else Some(protocolType)
     group.protocol = Option(protocol)
     group.leaderId = Option(leaderId)
+    group.currentStateTimestamp = currentStateTimestamp
     members.foreach(group.add)
     group
   }
@@ -167,10 +171,11 @@ case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offs
  *  3. leader id
  */
 @nonthreadsafe
-private[group] class GroupMetadata(val groupId: String, initialState: GroupState) extends Logging {
+private[group] class GroupMetadata(val groupId: String, initialState: GroupState, time: Time) extends Logging {
   private[group] val lock = new ReentrantLock
 
   private var state: GroupState = initialState
+  var currentStateTimestamp: Option[Long] = Some(time.milliseconds())
   var protocolType: Option[String] = None
   var generationId = 0
   private var leaderId: Option[String] = None
@@ -195,6 +200,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   def isLeader(memberId: String): Boolean = leaderId.contains(memberId)
   def leaderOrNull: String = leaderId.orNull
   def protocolOrNull: String = protocol.orNull
+  def currentStateTimestampOrDefault: Long = currentStateTimestamp.getOrElse(-1)
 
   def add(member: MemberMetadata) {
     if (members.isEmpty)
@@ -240,6 +246,7 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
   def transitionTo(groupState: GroupState) {
     assertValidTransition(groupState)
     state = groupState
+    currentStateTimestamp = Some(time.milliseconds())
   }
 
   def selectProtocol: String = {
@@ -434,18 +441,51 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     }.toMap
   }
 
-  def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
-    val expiredOffsets = offsets
-      .filter {
+  def removeExpiredOffsets(currentTimestamp: Long, offsetRetentionMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
+
+    def getExpiredOffsets(baseTimestamp: CommitRecordMetadataAndOffset => Long): Map[TopicPartition, OffsetAndMetadata] = {
+      offsets.filter {
         case (topicPartition, commitRecordMetadataAndOffset) =>
-          commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
-      }
-      .map {
+          !pendingOffsetCommits.contains(topicPartition) && {
+            commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp match {
+              case None =>
+                // current version with no per partition retention
+                currentTimestamp - baseTimestamp(commitRecordMetadataAndOffset) >= offsetRetentionMs
+              case Some(expireTimestamp) =>
+                // older versions with explicit expire_timestamp field => old expiration semantics is used
+                currentTimestamp >= expireTimestamp
+            }
+          }
+      }.map {
         case (topicPartition, commitRecordOffsetAndMetadata) =>
           (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
-      }
+      }.toMap
+    }
+
+    val expiredOffsets: Map[TopicPartition, OffsetAndMetadata] = protocolType match {
+      case Some(_) if is(Empty) =>
+        // no consumer exists in the group =>
+        // - if current state timestamp exists and retention period has passed since group became Empty,
+        //   expire all offsets with no pending offset commit;
+        // - if there is no current state timestamp (old group metadata schema) and retention period has passed
+        //   since the last commit timestamp, expire the offset
+        getExpiredOffsets(commitRecordMetadataAndOffset =>
+          currentStateTimestamp.getOrElse(commitRecordMetadataAndOffset.offsetAndMetadata.commitTimestamp))
+
+      case None =>
+        // protocolType is None => standalone (simple) consumer, that uses Kafka for offset storage only
+        // expire offsets with no pending offset commit that retention period has passed since their last commit
+        getExpiredOffsets(_.offsetAndMetadata.commitTimestamp)
+
+      case _ =>
+        Map()
+    }
+
+    if (expiredOffsets.nonEmpty)
+      debug(s"Expired offsets from group '$groupId': ${expiredOffsets.keySet}")
+
     offsets --= expiredOffsets.keySet
-    expiredOffsets.toMap
+    expiredOffsets
   }
 
   def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 02ba13a..6bd0a5a 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
@@ -197,18 +197,11 @@ class GroupMetadataManager(brokerId: Int,
                  responseCallback: Errors => Unit): Unit = {
     getMagic(partitionFor(group.groupId)) match {
       case Some(magicValue) =>
-        val groupMetadataValueVersion = {
-          if (interBrokerProtocolVersion < KAFKA_0_10_1_IV0)
-            0.toShort
-          else
-            GroupMetadataManager.CURRENT_GROUP_VALUE_SCHEMA_VERSION
-        }
-
         // We always use CREATE_TIME, like the producer. The conversion to LOG_APPEND_TIME (if necessary) happens automatically.
         val timestampType = TimestampType.CREATE_TIME
         val timestamp = time.milliseconds()
         val key = GroupMetadataManager.groupMetadataKey(group.groupId)
-        val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, version = groupMetadataValueVersion)
+        val value = GroupMetadataManager.groupMetadataValue(group, groupAssignment, interBrokerProtocolVersion)
 
         val records = {
           val buffer = ByteBuffer.allocate(AbstractRecords.estimateSizeInBytes(magicValue, compressionType,
@@ -330,7 +323,7 @@ class GroupMetadataManager(brokerId: Int,
 
           val records = filteredOffsetMetadata.map { case (topicPartition, offsetAndMetadata) =>
             val key = GroupMetadataManager.offsetCommitKey(group.groupId, topicPartition)
-            val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+            val value = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, interBrokerProtocolVersion)
             new SimpleRecord(timestamp, key, value)
           }
           val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId))
@@ -580,7 +573,7 @@ class GroupMetadataManager(brokerId: Int,
                   case groupMetadataKey: GroupMetadataKey =>
                     // load group metadata
                     val groupId = groupMetadataKey.key
-                    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+                    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
                     if (groupMetadata != null) {
                       removedGroups.remove(groupId)
                       loadedGroups.put(groupId, groupMetadata)
@@ -630,7 +623,7 @@ class GroupMetadataManager(brokerId: Int,
         // load groups which store offsets in kafka, but which have no active members and thus no group
         // metadata stored in the log
         (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { groupId =>
-          val group = new GroupMetadata(groupId, initialState = Empty)
+          val group = new GroupMetadata(groupId, Empty, time)
           val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
           val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
           debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets")
@@ -653,18 +646,8 @@ class GroupMetadataManager(brokerId: Int,
                         pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
     // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
     // view of the group's offsets
-    val loadedOffsets = offsets.mapValues { case CommitRecordMetadataAndOffset(commitRecordOffset, offsetAndMetadata) =>
-      // special handling for version 0:
-      // set the expiration time stamp as commit time stamp + server default retention time
-      val updatedOffsetAndMetadata =
-        if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
-        offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
-      else
-        offsetAndMetadata
-      CommitRecordMetadataAndOffset(commitRecordOffset, updatedOffsetAndMetadata)
-    }
-    trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
-    group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap)
+    trace(s"Initialized offsets $offsets for group ${group.groupId}")
+    group.initializeOffsets(offsets, pendingTransactionalOffsets.toMap)
 
     val currentGroup = addGroup(group)
     if (group != currentGroup)
@@ -711,11 +694,11 @@ class GroupMetadataManager(brokerId: Int,
 
   // visible for testing
   private[group] def cleanupGroupMetadata(): Unit = {
-    val startMs = time.milliseconds()
-    val offsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
-      group.removeExpiredOffsets(time.milliseconds())
+    val currentTimestamp = time.milliseconds()
+    val numOffsetsRemoved = cleanupGroupMetadata(groupMetadataCache.values, group => {
+      group.removeExpiredOffsets(currentTimestamp, config.offsetsRetentionMs)
     })
-    info(s"Removed $offsetsRemoved expired offsets in ${time.milliseconds() - startMs} milliseconds.")
+    info(s"Removed $numOffsetsRemoved expired offsets in ${time.milliseconds() - currentTimestamp} milliseconds.")
   }
 
   /**
@@ -917,7 +900,7 @@ class GroupMetadataManager(brokerId: Int,
  *    -> value version 1:       [offset, metadata, commit_timestamp, expire_timestamp]
  *
  * key version 2:       group metadata
- *     -> value version 0:       [protocol_type, generation, protocol, leader, members]
+ *    -> value version 0:       [protocol_type, generation, protocol, leader, members]
  */
 object GroupMetadataManager {
 
@@ -947,6 +930,13 @@ object GroupMetadataManager {
   private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("commit_timestamp")
   private val OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1 = OFFSET_COMMIT_VALUE_SCHEMA_V1.get("expire_timestamp")
 
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V2 = new Schema(new Field("offset", INT64),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("commit_timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("offset")
+  private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
+
   private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
@@ -975,10 +965,13 @@ object GroupMetadataManager {
     new Field(SUBSCRIPTION_KEY, BYTES),
     new Field(ASSIGNMENT_KEY, BYTES))
 
+  private val MEMBER_METADATA_V2 = MEMBER_METADATA_V1
+
   private val PROTOCOL_TYPE_KEY = "protocol_type"
   private val GENERATION_KEY = "generation"
   private val PROTOCOL_KEY = "protocol"
   private val LEADER_KEY = "leader"
+  private val CURRENT_STATE_TIMESTAMP_KEY = "current_state_timestamp"
   private val MEMBERS_KEY = "members"
 
   private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
@@ -995,6 +988,14 @@ object GroupMetadataManager {
     new Field(LEADER_KEY, NULLABLE_STRING),
     new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
 
+  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
+    new Field(PROTOCOL_TYPE_KEY, STRING),
+    new Field(GENERATION_KEY, INT32),
+    new Field(PROTOCOL_KEY, NULLABLE_STRING),
+    new Field(LEADER_KEY, NULLABLE_STRING),
+    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
+    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
+
 
   // map of versions to key schemas as data types
   private val MESSAGE_TYPE_SCHEMAS = Map(
@@ -1005,19 +1006,20 @@ object GroupMetadataManager {
   // map of version of offset value schemas
   private val OFFSET_VALUE_SCHEMAS = Map(
     0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
-    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1)
-  private val CURRENT_OFFSET_VALUE_SCHEMA_VERSION = 1.toShort
+    1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
+    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
 
   // map of version of group metadata value schemas
   private val GROUP_VALUE_SCHEMAS = Map(
     0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
-    1 -> GROUP_METADATA_VALUE_SCHEMA_V1)
-  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 1.toShort
+    1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
+    2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
+  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
 
-  private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+  private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
   private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
 
   private def schemaForKey(version: Int) = {
@@ -1081,17 +1083,34 @@ object GroupMetadataManager {
    * Generates the payload for offset commit message from given offset and metadata
    *
    * @param offsetAndMetadata consumer's current offset and metadata
+   * @param apiVersion the api version
    * @return payload for offset commit message
    */
-  private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata): Array[Byte] = {
-    // generate commit value with schema version 1
-    val value = new Struct(CURRENT_OFFSET_VALUE_SCHEMA)
-    value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
-    value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
-    value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
-    value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp)
+  private[group] def offsetCommitValue(offsetAndMetadata: OffsetAndMetadata,
+                                       apiVersion: ApiVersion): Array[Byte] = {
+    // generate commit value according to schema version
+    val (version, value) = {
+      if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
+        // if an older version of the API is used, or if an explicit expiration is provided, use the older schema
+        (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
+      else
+        (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
+    }
+
+    if (version == 2) {
+      value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
+      value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
+      value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
+    } else {
+      value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+      value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+      value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+      // version 1 has a non empty expireTimestamp field
+      value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.get)
+    }
+
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(CURRENT_OFFSET_VALUE_SCHEMA_VERSION)
+    byteBuffer.putShort(version)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -1102,19 +1121,30 @@ object GroupMetadataManager {
    *
    * @param groupMetadata current group metadata
    * @param assignment the assignment for the rebalancing generation
-   * @param version the version of the value message to use
+   * @param apiVersion the api version
    * @return payload for offset commit message
    */
   private[group] def groupMetadataValue(groupMetadata: GroupMetadata,
                                         assignment: Map[String, Array[Byte]],
-                                        version: Short = 0): Array[Byte] = {
-    val value = if (version == 0) new Struct(GROUP_METADATA_VALUE_SCHEMA_V0) else new Struct(CURRENT_GROUP_VALUE_SCHEMA)
+                                        apiVersion: ApiVersion): Array[Byte] = {
+
+    val (version, value) = {
+      if (apiVersion < KAFKA_0_10_1_IV0)
+        (0.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V0))
+      else if (apiVersion < KAFKA_2_1_IV0)
+        (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
+      else
+        (2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
+    }
 
     value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
     value.set(GENERATION_KEY, groupMetadata.generationId)
     value.set(PROTOCOL_KEY, groupMetadata.protocolOrNull)
     value.set(LEADER_KEY, groupMetadata.leaderOrNull)
 
+    if (version >= 2)
+      value.set(CURRENT_STATE_TIMESTAMP_KEY, groupMetadata.currentStateTimestampOrDefault)
+
     val memberArray = groupMetadata.allMemberMetadata.map { memberMetadata =>
       val memberStruct = value.instance(MEMBERS_KEY)
       memberStruct.set(MEMBER_ID_KEY, memberMetadata.memberId)
@@ -1174,7 +1204,7 @@ object GroupMetadataManager {
 
       GroupMetadataKey(version, group)
     } else {
-      throw new IllegalStateException("Unknown version " + version + " for group metadata message")
+      throw new IllegalStateException(s"Unknown group metadata message version: $version")
     }
   }
 
@@ -1205,8 +1235,14 @@ object GroupMetadataManager {
         val expireTimestamp = value.get(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1).asInstanceOf[Long]
 
         OffsetAndMetadata(offset, metadata, commitTimestamp, expireTimestamp)
+      } else if (version == 2) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V2).asInstanceOf[Long]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V2).asInstanceOf[String]
+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
+
+        OffsetAndMetadata(offset, metadata, commitTimestamp)
       } else {
-        throw new IllegalStateException("Unknown offset message version")
+        throw new IllegalStateException(s"Unknown offset message version: $version")
       }
     }
   }
@@ -1215,9 +1251,10 @@ object GroupMetadataManager {
    * Decodes the group metadata messages' payload and retrieves its member metadata from it
    *
    * @param buffer input byte-buffer
+   * @param time the time instance to use
    * @return a group metadata object from the message
    */
-  def readGroupMessageValue(groupId: String, buffer: ByteBuffer): GroupMetadata = {
+  def readGroupMessageValue(groupId: String, buffer: ByteBuffer, time: Time): GroupMetadata = {
     if (buffer == null) { // tombstone
       null
     } else {
@@ -1225,13 +1262,23 @@ object GroupMetadataManager {
       val valueSchema = schemaForGroup(version)
       val value = valueSchema.read(buffer)
 
-      if (version == 0 || version == 1) {
+      if (version >= 0 && version <= 2) {
         val generationId = value.get(GENERATION_KEY).asInstanceOf[Int]
         val protocolType = value.get(PROTOCOL_TYPE_KEY).asInstanceOf[String]
         val protocol = value.get(PROTOCOL_KEY).asInstanceOf[String]
         val leaderId = value.get(LEADER_KEY).asInstanceOf[String]
         val memberMetadataArray = value.getArray(MEMBERS_KEY)
         val initialState = if (memberMetadataArray.isEmpty) Empty else Stable
+        val currentStateTimestamp: Option[Long] = version match {
+          case version if version == 2 =>
+            if (value.hasField(CURRENT_STATE_TIMESTAMP_KEY)) {
+              val timestamp = value.getLong(CURRENT_STATE_TIMESTAMP_KEY)
+              if (timestamp == -1) None else Some(timestamp)
+            } else
+              None
+          case _ =>
+            None
+        }
 
         val members = memberMetadataArray.map { memberMetadataObj =>
           val memberMetadata = memberMetadataObj.asInstanceOf[Struct]
@@ -1247,9 +1294,9 @@ object GroupMetadataManager {
           member.assignment = Utils.toArray(memberMetadata.get(ASSIGNMENT_KEY).asInstanceOf[ByteBuffer])
           member
         }
-        GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, members)
+        GroupMetadata.loadGroup(groupId, initialState, generationId, protocolType, protocol, leaderId, currentStateTimestamp, members, time)
       } else {
-        throw new IllegalStateException("Unknown group metadata message version")
+        throw new IllegalStateException(s"Unknown group metadata message version: $version")
       }
     }
   }
@@ -1287,7 +1334,7 @@ object GroupMetadataManager {
           val value = consumerRecord.value
           val formattedValue =
             if (value == null) "NULL"
-            else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value)).toString
+            else GroupMetadataManager.readGroupMessageValue(groupId, ByteBuffer.wrap(value), Time.SYSTEM).toString
           output.write(groupId.getBytes(StandardCharsets.UTF_8))
           output.write("::".getBytes(StandardCharsets.UTF_8))
           output.write(formattedValue.getBytes(StandardCharsets.UTF_8))
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 0c88be9..de9e0bf 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -332,33 +332,25 @@ class KafkaApis(val requestChannel: RequestChannel,
       } else {
         // for version 1 and beyond store offsets in offset manager
 
-        // compute the retention time based on the request version:
-        // if it is v1 or not specified by user, we can use the default retention
-        val offsetRetention =
-          if (header.apiVersion <= 1 ||
-            offsetCommitRequest.retentionTime == OffsetCommitRequest.DEFAULT_RETENTION_TIME)
-            groupCoordinator.offsetConfig.offsetsRetentionMs
-          else
-            offsetCommitRequest.retentionTime
-
         // commit timestamp is always set to now.
         // "default" expiration timestamp is now + retention (and retention may be overridden if v2)
         // expire timestamp is computed differently for v1 and v2.
-        //   - If v1 and no explicit commit timestamp is provided we use default expiration timestamp.
+        //   - If v1 and no explicit commit timestamp is provided we treat it the same as v5.
         //   - If v1 and explicit commit timestamp is provided we calculate retention from that explicit commit timestamp
-        //   - If v2 we use the default expiration timestamp
+        //   - If v2/v3/v4 (no explicit commit timestamp) we treat it the same as v5.
+        //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
         val currentTimestamp = time.milliseconds
-        val defaultExpireTimestamp = offsetRetention + currentTimestamp
         val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
           val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
           new OffsetAndMetadata(
             offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
-            commitTimestamp = currentTimestamp,
-            expireTimestamp = {
-              if (partitionData.timestamp == OffsetCommitRequest.DEFAULT_TIMESTAMP)
-                defaultExpireTimestamp
-              else
-                offsetRetention + partitionData.timestamp
+            commitTimestamp = partitionData.timestamp match {
+              case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
+              case customTimestamp => customTimestamp
+            },
+            expireTimestamp = offsetCommitRequest.retentionTime match {
+              case OffsetCommitRequest.DEFAULT_RETENTION_TIME => None
+              case retentionTime => Some(currentTimestamp + retentionTime)
             }
           )
         }
@@ -1912,7 +1904,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       topicPartition -> new OffsetAndMetadata(
         offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
         commitTimestamp = currentTimestamp,
-        expireTimestamp = defaultExpireTimestamp)
+        expireTimestamp = Some(defaultExpireTimestamp))
     }
   }
 
diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
index c55f6c4..7e2c564 100755
--- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
+++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala
@@ -368,6 +368,10 @@ object ConsoleConsumer extends Logging {
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, group)
       case None =>
         consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, s"console-consumer-${new Random().nextInt(100000)}")
+        // By default, avoid unnecessary expansion of the coordinator cache since
+        // the auto-generated group and its offsets is not intended to be used again
+        if (!consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
+          consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
         groupIdPassed = false
     }
 
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 17fbd8f..1792c7b 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -29,7 +29,7 @@ import kafka.utils._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.record._
-import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection.{Map, mutable}
 import scala.collection.mutable.ArrayBuffer
@@ -321,7 +321,7 @@ object DumpLogSegments {
 
     private def parseGroupMetadata(groupMetadataKey: GroupMetadataKey, payload: ByteBuffer) = {
       val groupId = groupMetadataKey.key
-      val group = GroupMetadataManager.readGroupMessageValue(groupId, payload)
+      val group = GroupMetadataManager.readGroupMessageValue(groupId, payload, Time.SYSTEM)
       val protocolType = group.protocolType.getOrElse("")
 
       val assignment = group.allMemberMetadata.map { member =>
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index f4ff8e1..69b6525 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -334,7 +334,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createOffsetCommitRequest = {
     new requests.OffsetCommitRequest.Builder(
       group, Map(tp -> new requests.OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
-      setMemberId("").setGenerationId(1).setRetentionTime(1000).
+      setMemberId("").setGenerationId(1).
       build()
   }
 
diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
index 32e4085..2befc8f 100644
--- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
+++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala
@@ -79,6 +79,9 @@ class ApiVersionTest {
     assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0"))
     assertEquals(KAFKA_2_0_IV0, ApiVersion("2.0-IV0"))
     assertEquals(KAFKA_2_0_IV1, ApiVersion("2.0-IV1"))
+
+    assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1"))
+    assertEquals(KAFKA_2_1_IV0, ApiVersion("2.1-IV0"))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 3bfacab..77e6fdc 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,20 +17,22 @@
 
 package kafka.coordinator.group
 
-import kafka.api.ApiVersion
+import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
 import kafka.server.{FetchDataInfo, KafkaConfig, LogOffsetMetadata, ReplicaManager}
 import kafka.utils.TestUtils.fail
 import kafka.utils.{KafkaScheduler, MockTime, TestUtils}
+import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.easymock.{Capture, EasyMock, IAnswer}
-import org.junit.Assert.{assertEquals, assertFalse, assertTrue, assertNull}
+import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
 
@@ -52,6 +54,7 @@ class GroupMetadataManagerTest {
   var scheduler: KafkaScheduler = null
   var zkClient: KafkaZkClient = null
   var partition: Partition = null
+  var defaultOffsetRetentionMs = Long.MaxValue
 
   val groupId = "foo"
   val groupPartitionId = 0
@@ -75,6 +78,8 @@ class GroupMetadataManagerTest {
       offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
       offsetCommitRequiredAcks = config.offsetCommitRequiredAcks)
 
+    defaultOffsetRetentionMs = offsetConfig.offsetsRetentionMs
+
     // make two partitions of the group topic to make sure some partitions are not owned by the coordinator
     zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
@@ -506,7 +511,7 @@ class GroupMetadataManagerTest {
     // group is owned but does not exist yet
     assertTrue(groupMetadataManager.groupNotExists(groupId))
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     // group is owned but not Dead
@@ -616,6 +621,7 @@ class GroupMetadataManagerTest {
     assertEquals(committedOffsets.size, group.allOffsets.size)
     committedOffsets.foreach { case (topicPartition, offset) =>
       assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertTrue(group.offset(topicPartition).map(_.expireTimestamp).contains(None))
     }
   }
 
@@ -729,9 +735,9 @@ class GroupMetadataManagerTest {
 
   @Test
   def testAddGroup() {
-    val group = new GroupMetadata("foo", initialState = Empty)
+    val group = new GroupMetadata("foo", Empty, time)
     assertEquals(group, groupMetadataManager.addGroup(group))
-    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", initialState = Empty)))
+    assertEquals(group, groupMetadataManager.addGroup(new GroupMetadata("foo", Empty, time)))
   }
 
   @Test
@@ -739,7 +745,7 @@ class GroupMetadataManagerTest {
     val generation = 27
     val protocolType = "consumer"
 
-    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
     groupMetadataManager.addGroup(group)
 
     val capturedRecords = expectAppendMessage(Errors.NONE)
@@ -758,7 +764,7 @@ class GroupMetadataManagerTest {
     assertEquals(1, records.size)
 
     val record = records.head
-    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
     assertTrue(groupMetadata.is(Empty))
     assertEquals(generation, groupMetadata.generationId)
     assertEquals(Some(protocolType), groupMetadata.protocolType)
@@ -766,7 +772,7 @@ class GroupMetadataManagerTest {
 
   @Test
   def testStoreEmptySimpleGroup() {
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val capturedRecords = expectAppendMessage(Errors.NONE)
@@ -787,7 +793,7 @@ class GroupMetadataManagerTest {
     assertEquals(1, records.size)
 
     val record = records.head
-    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value)
+    val groupMetadata = GroupMetadataManager.readGroupMessageValue(groupId, record.value, time)
     assertTrue(groupMetadata.is(Empty))
     assertEquals(0, groupMetadata.generationId)
     assertEquals(None, groupMetadata.protocolType)
@@ -809,7 +815,7 @@ class GroupMetadataManagerTest {
   private def assertStoreGroupErrorMapping(appendError: Errors, expectedError: Errors) {
     EasyMock.reset(replicaManager)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     expectAppendMessage(appendError)
@@ -832,7 +838,7 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
@@ -863,7 +869,7 @@ class GroupMetadataManagerTest {
     val clientId = "clientId"
     val clientHost = "localhost"
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
 
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
       protocolType, List(("protocol", Array[Byte]())))
@@ -893,7 +899,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -935,7 +941,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -975,7 +981,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1014,7 +1020,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1052,7 +1058,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1094,7 +1100,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
@@ -1132,7 +1138,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     // expire the offset after 1 millisecond
@@ -1185,7 +1191,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
     group.generationId = 5
 
@@ -1233,7 +1239,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
     group.generationId = 5
 
@@ -1287,7 +1293,7 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
     // expire the offset after 1 millisecond
@@ -1348,31 +1354,39 @@ class GroupMetadataManagerTest {
   }
 
   @Test
-  def testExpireOffsetsWithActiveGroup() {
+  def testOffsetExpirationSemantics() {
     val memberId = "memberId"
     val clientId = "clientId"
     val clientHost = "localhost"
-    val topicPartition1 = new TopicPartition("foo", 0)
-    val topicPartition2 = new TopicPartition("foo", 1)
+    val topic = "foo"
+    val topicPartition1 = new TopicPartition(topic, 0)
+    val topicPartition2 = new TopicPartition(topic, 1)
+    val topicPartition3 = new TopicPartition(topic, 2)
     val offset = 37
 
     groupMetadataManager.addPartitionOwnership(groupPartitionId)
 
-    val group = new GroupMetadata(groupId, initialState = Empty)
+    val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
+    val subscription = new Subscription(List(topic).asJava)
     val member = new MemberMetadata(memberId, groupId, clientId, clientHost, rebalanceTimeout, sessionTimeout,
-      protocolType, List(("protocol", Array[Byte]())))
+      protocolType, List(("protocol", ConsumerProtocol.serializeSubscription(subscription).array())))
     member.awaitingJoinCallback = _ => ()
     group.add(member)
     group.transitionTo(PreparingRebalance)
     group.initNextGeneration()
 
-    // expire the offset after 1 millisecond
     val startMs = time.milliseconds
+    // old clients, expiry timestamp is explicitly set
+    val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 1)
+    val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs, startMs + 3)
+    // new clients, no per-partition expiry timestamp, offsets of group expire together
+    val tp3OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
     val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
-      topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
+      topicPartition1 -> tp1OffsetAndMetadata,
+      topicPartition2 -> tp2OffsetAndMetadata,
+      topicPartition3 -> tp3OffsetAndMetadata)
 
     mockGetPartition()
     expectAppendMessage(Errors.NONE)
@@ -1389,8 +1403,26 @@ class GroupMetadataManagerTest {
     assertFalse(commitErrors.isEmpty)
     assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
 
-    // expire all of the offsets
-    time.sleep(4)
+    // do not expire any offset even though expiration timestamp is reached for one (due to group still being active)
+    time.sleep(2)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // group and offsets should still be there
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+    assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
+    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+    var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    group.transitionTo(PreparingRebalance)
+    group.transitionTo(Empty)
 
     // expect the offset tombstone
     EasyMock.reset(partition)
@@ -1401,16 +1433,245 @@ class GroupMetadataManagerTest {
 
     groupMetadataManager.cleanupGroupMetadata()
 
-    // group should still be there, but the offsets should be gone
+    // group is empty now, only one offset should expire
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(Some(tp2OffsetAndMetadata), group.offset(topicPartition2))
+    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+    cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition2).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    time.sleep(2)
+
+    // expect the offset tombstone
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // one more offset should expire
     assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
     assertEquals(None, group.offset(topicPartition1))
     assertEquals(None, group.offset(topicPartition2))
+    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
 
-    val cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2)))
+    cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    // advance time to just before the offset of last partition is to be expired, no offset should expire
+    time.sleep(group.currentStateTimestamp.get + defaultOffsetRetentionMs - time.milliseconds() - 1)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // one more offset should expire
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(None, group.offset(topicPartition2))
+    assertEquals(Some(tp3OffsetAndMetadata), group.offset(topicPartition3))
+
+    cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition3).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    // advance time enough for that last offset to expire
+    time.sleep(2)
+
+    // expect the offset tombstone
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // group and all its offsets should be gone now
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+    assertEquals(None, group.offset(topicPartition2))
+    assertEquals(None, group.offset(topicPartition3))
+
+    cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1, topicPartition2, topicPartition3)))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
     assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition2).map(_.offset))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition3).map(_.offset))
 
     EasyMock.verify(replicaManager)
+
+    assert(group.is(Dead))
+  }
+
+  @Test
+  def testOffsetExpirationOfSimpleConsumer() {
+    val memberId = "memberId"
+    val clientId = "clientId"
+    val clientHost = "localhost"
+    val topic = "foo"
+    val topicPartition1 = new TopicPartition(topic, 0)
+    val offset = 37
+
+    groupMetadataManager.addPartitionOwnership(groupPartitionId)
+
+    val group = new GroupMetadata(groupId, Empty, time)
+    groupMetadataManager.addGroup(group)
+
+    // expire the offset after 1 and 3 milliseconds (old clients) and after default retention (new clients)
+    val startMs = time.milliseconds
+    // old clients, expiry timestamp is explicitly set
+    val tp1OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
+    val tp2OffsetAndMetadata = OffsetAndMetadata(offset, "", startMs)
+    // new clients, no per-partition expiry timestamp, offsets of group expire together
+    val offsets = immutable.Map(
+      topicPartition1 -> tp1OffsetAndMetadata)
+
+    mockGetPartition()
+    expectAppendMessage(Errors.NONE)
+    EasyMock.replay(replicaManager)
+
+    var commitErrors: Option[immutable.Map[TopicPartition, Errors]] = None
+    def callback(errors: immutable.Map[TopicPartition, Errors]) {
+      commitErrors = Some(errors)
+    }
+
+    groupMetadataManager.storeOffsets(group, memberId, offsets, callback)
+    assertTrue(group.hasOffsets)
+
+    assertFalse(commitErrors.isEmpty)
+    assertEquals(Some(Errors.NONE), commitErrors.get.get(topicPartition1))
+
+    // do not expire offsets while within retention period since commit timestamp
+    val expiryTimestamp = offsets.get(topicPartition1).get.commitTimestamp + defaultOffsetRetentionMs
+    time.sleep(expiryTimestamp - time.milliseconds() - 1)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // group and offsets should still be there
+    assertEquals(Some(group), groupMetadataManager.getGroup(groupId))
+    assertEquals(Some(tp1OffsetAndMetadata), group.offset(topicPartition1))
+
+    var cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
+    assertEquals(Some(offset), cachedOffsets.get(topicPartition1).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    // advance time to enough for offsets to expire
+    time.sleep(2)
+
+    // expect the offset tombstone
+    EasyMock.reset(partition)
+    EasyMock.expect(partition.appendRecordsToLeader(EasyMock.anyObject(classOf[MemoryRecords]),
+      isFromClient = EasyMock.eq(false), requiredAcks = EasyMock.anyInt()))
+      .andReturn(LogAppendInfo.UnknownLogAppendInfo)
+    EasyMock.replay(partition)
+
+    groupMetadataManager.cleanupGroupMetadata()
+
+    // group and all its offsets should be gone now
+    assertEquals(None, groupMetadataManager.getGroup(groupId))
+    assertEquals(None, group.offset(topicPartition1))
+
+    cachedOffsets = groupMetadataManager.getOffsets(groupId, Some(Seq(topicPartition1)))
+    assertEquals(Some(OffsetFetchResponse.INVALID_OFFSET), cachedOffsets.get(topicPartition1).map(_.offset))
+
+    EasyMock.verify(replicaManager)
+
+    assert(group.is(Dead))
+  }
+
+  @Test
+  def testLoadOffsetFromOldCommit() = {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val apiVersion = KAFKA_1_1_IV0
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
+    val memberId = "98098230493"
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Stable, group.currentState)
+    assertEquals(memberId, group.leaderOrNull)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertEquals(protocol, group.protocolOrNull)
+    assertEquals(Set(memberId), group.allMembers)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
+    }
+  }
+
+  @Test
+  def testLoadOffsetWithExplicitRetention() = {
+    val groupMetadataTopicPartition = groupTopicPartition
+    val generation = 935
+    val protocolType = "consumer"
+    val protocol = "range"
+    val startOffset = 15L
+    val committedOffsets = Map(
+      new TopicPartition("foo", 0) -> 23L,
+      new TopicPartition("foo", 1) -> 455L,
+      new TopicPartition("bar", 0) -> 8992L
+    )
+
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
+    val memberId = "98098230493"
+    val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
+    val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+      offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
+
+    expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Stable, group.currentState)
+    assertEquals(memberId, group.leaderOrNull)
+    assertEquals(generation, group.generationId)
+    assertEquals(Some(protocolType), group.protocolType)
+    assertEquals(protocol, group.protocolOrNull)
+    assertEquals(Set(memberId), group.allMembers)
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertTrue(group.offset(topicPartition).map(_.expireTimestamp).get.nonEmpty)
+    }
   }
 
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
@@ -1452,20 +1713,21 @@ class GroupMetadataManagerTest {
   private def buildStableGroupRecordWithMember(generation: Int,
                                                protocolType: String,
                                                protocol: String,
-                                               memberId: String): SimpleRecord = {
+                                               memberId: String,
+                                               apiVersion: ApiVersion = ApiVersion.latestVersion): SimpleRecord = {
     val memberProtocols = List((protocol, Array.emptyByteArray))
     val member = new MemberMetadata(memberId, groupId, "clientId", "clientHost", 30000, 10000, protocolType, memberProtocols)
-    val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol,
-      leaderId = memberId, Seq(member))
+    val group = GroupMetadata.loadGroup(groupId, Stable, generation, protocolType, protocol, memberId,
+      if (apiVersion >= KAFKA_2_1_IV0) Some(time.milliseconds()) else None, Seq(member), time)
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
-    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]))
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map(memberId -> Array.empty[Byte]), apiVersion)
     new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
   private def buildEmptyGroupRecord(generation: Int, protocolType: String): SimpleRecord = {
-    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, Seq.empty)
+    val group = GroupMetadata.loadGroup(groupId, Empty, generation, protocolType, null, null, None, Seq.empty, time)
     val groupMetadataKey = GroupMetadataManager.groupMetadataKey(groupId)
-    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty)
+    val groupMetadataValue = GroupMetadataManager.groupMetadataValue(group, Map.empty, ApiVersion.latestVersion)
     new SimpleRecord(groupMetadataKey, groupMetadataValue)
   }
 
@@ -1511,11 +1773,19 @@ class GroupMetadataManagerTest {
   }
 
   private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
-                                           groupId: String = groupId): Seq[SimpleRecord] = {
+                                           groupId: String = groupId,
+                                           apiVersion: ApiVersion = ApiVersion.latestVersion,
+                                           retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
     committedOffsets.map { case (topicPartition, offset) =>
-      val offsetAndMetadata = OffsetAndMetadata(offset)
+      val offsetAndMetadata = retentionTime match {
+        case Some(timestamp) =>
+          val commitTimestamp = time.milliseconds()
+          OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
+        case None =>
+          OffsetAndMetadata(offset)
+      }
       val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
-      val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata)
+      val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
       new SimpleRecord(offsetCommitKey, offsetCommitValue)
     }.toSeq
   }
@@ -1542,7 +1812,7 @@ class GroupMetadataManagerTest {
   def testMetrics() {
     groupMetadataManager.cleanupGroupMetadata()
     expectMetrics(groupMetadataManager, 0, 0, 0)
-    val group = new GroupMetadata("foo2", Stable)
+    val group = new GroupMetadata("foo2", Stable, time)
     groupMetadataManager.addGroup(group)
     expectMetrics(groupMetadataManager, 1, 0, 0)
     group.transitionTo(PreparingRebalance)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index 183860f..9054533 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -18,9 +18,8 @@
 package kafka.coordinator.group
 
 import kafka.common.OffsetAndMetadata
-
 import org.apache.kafka.common.TopicPartition
-
+import org.apache.kafka.common.utils.Time
 import org.junit.Assert._
 import org.junit.{Before, Test}
 import org.scalatest.junit.JUnitSuite
@@ -40,7 +39,7 @@ class GroupMetadataTest extends JUnitSuite {
 
   @Before
   def setUp() {
-    group = new GroupMetadata("groupId", initialState = Empty)
+    group = new GroupMetadata("groupId", Empty, Time.SYSTEM)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 0205bcf..d91e008 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -240,7 +240,7 @@ class RequestQuotaTest extends BaseRequestTest {
         case ApiKeys.OFFSET_COMMIT =>
           new OffsetCommitRequest.Builder("test-group",
             Map(tp -> new OffsetCommitRequest.PartitionData(0, "metadata")).asJava).
-            setMemberId("").setGenerationId(1).setRetentionTime(1000)
+            setMemberId("").setGenerationId(1)
 
         case ApiKeys.OFFSET_FETCH =>
           new OffsetFetchRequest.Builder("test-group", List(tp).asJava)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index cb246f6..26c0d15 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -19,6 +19,18 @@
 
 <script id="upgrade-template" type="text/x-handlebars-template">
 
+<h4><a id="upgrade_2_1_0" href="#upgrade_2_1_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, 1.1.x, or 2.0.0 to 2.1.0</a></h4>
+<p><b>Additional Upgrade Notes:</b></p>
+
+<ol>
+    <li>Offset expiration semantics has slightly changed in this version. According to the new semantics, offsets of partitions in a group will
+        not be removed while the group is subscribed to the corresponding topic and is still active (has active consumers). If group becomes
+        empty all its offsets will be removed after default offset retention period (or the one set by broker) has passed (unless the group becomes
+        active again). Offsets associated with standalone (simple) consumers, that do not use Kafka group management, will be removed after default
+        offset retention period (or the one set by broker) has passed since their last commit.</li>
+</ol>
+
+
 <h4><a id="upgrade_2_0_0" href="#upgrade_2_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x, 0.10.2.x, 0.11.0.x, 1.0.x, or 1.1.x to 2.0.0</a></h4>
 <p>Kafka 2.0.0 introduces wire protocol changes. By following the recommended rolling upgrade plan below,
     you guarantee no downtime during the upgrade. However, please review the <a href="#upgrade_200_notable">notable changes in 2.0.0</a> before upgrading.
@@ -66,6 +78,9 @@
         until all brokers in the cluster have been updated.
         <p><b>NOTE:</b> any prefixed ACLs added to a cluster, even after the cluster is fully upgraded, will be ignored should the cluster be downgraded again.
     </li>
+    <li>The default for console consumer's <code>enable.auto.commit</code> property when no <code>group.id</code> is provided is now set to <code>false</code>.
+        This is to avoid polluting the consumer coordinator cache as the auto-generated group is not likely to be used by other consumers.
+    </li>
 </ol>
 
 <h5><a id="upgrade_200_notable" href="#upgrade_200_notable">Notable changes in 2.0.0</a></h5>
@@ -112,9 +127,9 @@
         <code>beginningOffsets</code>, <code>endOffsets</code> and <code>close</code> that take in a <code>Duration</code>.</li>
     <li>Also as part of KIP-266, the default value of <code>request.timeout.ms</code> has been changed to 30 seconds.
         The previous value was a little higher than 5 minutes to account for maximum time that a rebalance would take.
-        Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from 
+        Now we treat the JoinGroup request in the rebalance as a special case and use a value derived from
         <code>max.poll.interval.ms</code> for the request timeout. All other request types use the timeout defined
-        by <code>request.timeout.ms</code></li> 
+        by <code>request.timeout.ms</code></li>
     <li>The internal method <code>kafka.admin.AdminClient.deleteRecordsBefore</code> has been removed. Users are encouraged to migrate to <code>org.apache.kafka.clients.admin.AdminClient.deleteRecords</code>.</li>
     <li>The AclCommand tool <code>--producer</code> convenience option uses the <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-277+-+Fine+Grained+ACL+for+CreateTopics+API">KIP-277</a> finer grained ACL on the given topic. </li>
     <li><a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-176%3A+Remove+deprecated+new-consumer+option+for+tools">KIP-176</a> removes