You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/09/26 13:26:07 UTC

[kafka] branch trunk updated: KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f7267d  KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)
9f7267d is described below

commit 9f7267dd2fedde86bf15aabdbc5256e5fc617184
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 26 06:25:55 2018 -0700

    KAFKA-7437; Persist leader epoch in offset commit metadata (#5689)
    
    This commit implements the changes described in KIP-320 for the persistence of leader epoch information in the offset commit protocol.
    
    Reviewers:  Rajini Sivaram <ra...@googlemail.com>
---
 .../scala/kafka/common/OffsetAndMetadata.scala     |  52 +++++++++++
 .../kafka/common/OffsetMetadataAndError.scala      |  81 -----------------
 .../coordinator/group/GroupMetadataManager.scala   |  89 +++++++++++-------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  18 ++--
 .../kafka/api/PlaintextConsumerTest.scala          |   4 +-
 .../integration/kafka/api/TransactionsTest.scala   |  29 +++++-
 .../group/GroupCoordinatorConcurrencyTest.scala    |  14 +--
 .../coordinator/group/GroupCoordinatorTest.scala   |  54 +++++++----
 .../group/GroupMetadataManagerTest.scala           | 101 +++++++++++++++++----
 .../coordinator/group/GroupMetadataTest.scala      |  31 ++++---
 10 files changed, 290 insertions(+), 183 deletions(-)

diff --git a/core/src/main/scala/kafka/common/OffsetAndMetadata.scala b/core/src/main/scala/kafka/common/OffsetAndMetadata.scala
new file mode 100644
index 0000000..5f0c080
--- /dev/null
+++ b/core/src/main/scala/kafka/common/OffsetAndMetadata.scala
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+import java.util.Optional
+
+case class OffsetAndMetadata(offset: Long,
+                             leaderEpoch: Optional[Integer],
+                             metadata: String,
+                             commitTimestamp: Long,
+                             expireTimestamp: Option[Long]) {
+
+
+  override def toString: String  = {
+    s"OffsetAndMetadata(offset=$offset" +
+      s", leaderEpoch=$leaderEpoch" +
+      s", metadata=$metadata" +
+      s", commitTimestamp=$commitTimestamp" +
+      s", expireTimestamp=$expireTimestamp)"
+  }
+}
+
+object OffsetAndMetadata {
+  val NoMetadata: String = ""
+
+  def apply(offset: Long, metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, None)
+  }
+
+  def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, Optional.empty(), metadata, commitTimestamp, Some(expireTimestamp))
+  }
+
+  def apply(offset: Long, leaderEpoch: Optional[Integer], metadata: String, commitTimestamp: Long): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, leaderEpoch, metadata, commitTimestamp, None)
+  }
+}
diff --git a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala b/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
deleted file mode 100644
index afe542c..0000000
--- a/core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-import org.apache.kafka.common.protocol.Errors
-
-case class OffsetMetadata(offset: Long, metadata: String = OffsetMetadata.NoMetadata) {
-  override def toString = "OffsetMetadata[%d,%s]"
-    .format(offset,
-    if (metadata != null && metadata.length > 0) metadata else "NO_METADATA")
-}
-
-object OffsetMetadata {
-  val InvalidOffset: Long = -1L
-  val NoMetadata: String = ""
-
-  val InvalidOffsetMetadata = OffsetMetadata(OffsetMetadata.InvalidOffset, OffsetMetadata.NoMetadata)
-}
-
-case class OffsetAndMetadata(offsetMetadata: OffsetMetadata,
-                             commitTimestamp: Long = org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP,
-                             expireTimestamp: Option[Long] = None) {
-
-  def offset = offsetMetadata.offset
-
-  def metadata = offsetMetadata.metadata
-
-  override def toString = s"[$offsetMetadata,CommitTime $commitTimestamp,ExpirationTime ${expireTimestamp.getOrElse("_")}]"
-}
-
-object OffsetAndMetadata {
-  def apply(offset: Long, metadata: String, commitTimestamp: Long, expireTimestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), commitTimestamp, Some(expireTimestamp))
-
-  def apply(offset: Long, metadata: String, timestamp: Long) = new OffsetAndMetadata(OffsetMetadata(offset, metadata), timestamp)
-
-  def apply(offset: Long, metadata: String) = new OffsetAndMetadata(OffsetMetadata(offset, metadata))
-
-  def apply(offset: Long) = new OffsetAndMetadata(OffsetMetadata(offset, OffsetMetadata.NoMetadata))
-}
-
-case class OffsetMetadataAndError(offsetMetadata: OffsetMetadata, error: Errors = Errors.NONE) {
-  def offset = offsetMetadata.offset
-
-  def metadata = offsetMetadata.metadata
-
-  override def toString = "[%s, Error=%s]".format(offsetMetadata, error)
-}
-
-object OffsetMetadataAndError {
-  val NoOffset = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NONE)
-  val GroupLoading = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_LOAD_IN_PROGRESS)
-  val UnknownMember = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_MEMBER_ID)
-  val NotCoordinatorForGroup = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.NOT_COORDINATOR)
-  val GroupCoordinatorNotAvailable = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.COORDINATOR_NOT_AVAILABLE)
-  val UnknownTopicOrPartition = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.UNKNOWN_TOPIC_OR_PARTITION)
-  val IllegalGroupGenerationId = OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, Errors.ILLEGAL_GENERATION)
-
-  def apply(offset: Long) = new OffsetMetadataAndError(OffsetMetadata(offset, OffsetMetadata.NoMetadata), Errors.NONE)
-
-  def apply(error: Errors) = new OffsetMetadataAndError(OffsetMetadata.InvalidOffsetMetadata, error)
-
-  def apply(offset: Long, metadata: String, error: Errors) = new OffsetMetadataAndError(OffsetMetadata(offset, metadata), error)
-}
-
-
-
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index dba8b4e..21f4a3d 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.ReentrantLock
 
 import com.yammer.metrics.core.Gauge
-import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0}
+import kafka.api.{ApiVersion, KAFKA_0_10_1_IV0, KAFKA_2_1_IV0, KAFKA_2_1_IV1}
 import kafka.common.{MessageFormatter, OffsetAndMetadata}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.ReplicaManager
@@ -460,7 +460,7 @@ class GroupMetadataManager(brokerId: Int,
               // that commit offsets to Kafka.)
               group.allOffsets.map { case (topicPartition, offsetAndMetadata) =>
                 topicPartition -> new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
-                  Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
+                  offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
               }
 
             case Some(topicPartitions) =>
@@ -471,7 +471,7 @@ class GroupMetadataManager(brokerId: Int,
                       Optional.empty(), "", Errors.NONE)
                   case Some(offsetAndMetadata) =>
                     new OffsetFetchResponse.PartitionData(offsetAndMetadata.offset,
-                      Optional.empty(), offsetAndMetadata.metadata, Errors.NONE)
+                      offsetAndMetadata.leaderEpoch, offsetAndMetadata.metadata, Errors.NONE)
                 }
                 topicPartition -> partitionData
               }.toMap
@@ -960,6 +960,16 @@ object GroupMetadataManager {
   private val OFFSET_VALUE_METADATA_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("metadata")
   private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2 = OFFSET_COMMIT_VALUE_SCHEMA_V2.get("commit_timestamp")
 
+  private val OFFSET_COMMIT_VALUE_SCHEMA_V3 = new Schema(
+    new Field("offset", INT64),
+    new Field("leader_epoch", INT32),
+    new Field("metadata", STRING, "Associated metadata.", ""),
+    new Field("commit_timestamp", INT64))
+  private val OFFSET_VALUE_OFFSET_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("offset")
+  private val OFFSET_VALUE_LEADER_EPOCH_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("leader_epoch")
+  private val OFFSET_VALUE_METADATA_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("metadata")
+  private val OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3 = OFFSET_COMMIT_VALUE_SCHEMA_V3.get("commit_timestamp")
+
   private val GROUP_METADATA_KEY_SCHEMA = new Schema(new Field("group", STRING))
   private val GROUP_KEY_GROUP_FIELD = GROUP_METADATA_KEY_SCHEMA.get("group")
 
@@ -1019,7 +1029,6 @@ object GroupMetadataManager {
     new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
     new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
 
-
   // map of versions to key schemas as data types
   private val MESSAGE_TYPE_SCHEMAS = Map(
     0 -> OFFSET_COMMIT_KEY_SCHEMA,
@@ -1030,21 +1039,18 @@ object GroupMetadataManager {
   private val OFFSET_VALUE_SCHEMAS = Map(
     0 -> OFFSET_COMMIT_VALUE_SCHEMA_V0,
     1 -> OFFSET_COMMIT_VALUE_SCHEMA_V1,
-    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2)
+    2 -> OFFSET_COMMIT_VALUE_SCHEMA_V2,
+    3 -> OFFSET_COMMIT_VALUE_SCHEMA_V3)
 
   // map of version of group metadata value schemas
   private val GROUP_VALUE_SCHEMAS = Map(
     0 -> GROUP_METADATA_VALUE_SCHEMA_V0,
     1 -> GROUP_METADATA_VALUE_SCHEMA_V1,
     2 -> GROUP_METADATA_VALUE_SCHEMA_V2)
-  private val CURRENT_GROUP_VALUE_SCHEMA_VERSION = 2.toShort
 
   private val CURRENT_OFFSET_KEY_SCHEMA = schemaForKey(CURRENT_OFFSET_KEY_SCHEMA_VERSION)
   private val CURRENT_GROUP_KEY_SCHEMA = schemaForKey(CURRENT_GROUP_KEY_SCHEMA_VERSION)
 
-  private val CURRENT_OFFSET_VALUE_SCHEMA = schemaForOffset(2)
-  private val CURRENT_GROUP_VALUE_SCHEMA = schemaForGroup(CURRENT_GROUP_VALUE_SCHEMA_VERSION)
-
   private def schemaForKey(version: Int) = {
     val schemaOpt = MESSAGE_TYPE_SCHEMAS.get(version)
     schemaOpt match {
@@ -1053,7 +1059,7 @@ object GroupMetadataManager {
     }
   }
 
-  private def schemaForOffset(version: Int) = {
+  private def schemaForOffsetValue(version: Int) = {
     val schemaOpt = OFFSET_VALUE_SCHEMAS.get(version)
     schemaOpt match {
       case Some(schema) => schema
@@ -1061,7 +1067,7 @@ object GroupMetadataManager {
     }
   }
 
-  private def schemaForGroup(version: Int) = {
+  private def schemaForGroupValue(version: Int) = {
     val schemaOpt = GROUP_VALUE_SCHEMAS.get(version)
     schemaOpt match {
       case Some(schema) => schema
@@ -1074,8 +1080,8 @@ object GroupMetadataManager {
    *
    * @return key for offset commit message
    */
-  private[group] def offsetCommitKey(group: String, topicPartition: TopicPartition,
-                                           versionId: Short = 0): Array[Byte] = {
+  private[group] def offsetCommitKey(group: String,
+                                     topicPartition: TopicPartition): Array[Byte] = {
     val key = new Struct(CURRENT_OFFSET_KEY_SCHEMA)
     key.set(OFFSET_KEY_GROUP_FIELD, group)
     key.set(OFFSET_KEY_TOPIC_FIELD, topicPartition.topic)
@@ -1113,27 +1119,34 @@ object GroupMetadataManager {
                                        apiVersion: ApiVersion): Array[Byte] = {
     // generate commit value according to schema version
     val (version, value) = {
-      if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty)
-        // if an older version of the API is used, or if an explicit expiration is provided, use the older schema
-        (1.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1))
-      else
-        (2.toShort, new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2))
-    }
-
-    if (version == 2) {
-      value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
-      value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
-      value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
-    } else {
-      value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
-      value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
-      value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
-      // version 1 has a non empty expireTimestamp field
-      value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1, offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+      if (apiVersion < KAFKA_2_1_IV0 || offsetAndMetadata.expireTimestamp.nonEmpty) {
+        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V1)
+        value.set(OFFSET_VALUE_OFFSET_FIELD_V1, offsetAndMetadata.offset)
+        value.set(OFFSET_VALUE_METADATA_FIELD_V1, offsetAndMetadata.metadata)
+        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V1, offsetAndMetadata.commitTimestamp)
+        // version 1 has a non empty expireTimestamp field
+        value.set(OFFSET_VALUE_EXPIRE_TIMESTAMP_FIELD_V1,
+          offsetAndMetadata.expireTimestamp.getOrElse(OffsetCommitRequest.DEFAULT_TIMESTAMP))
+        (1, value)
+      } else if (apiVersion < KAFKA_2_1_IV1) {
+        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V2)
+        value.set(OFFSET_VALUE_OFFSET_FIELD_V2, offsetAndMetadata.offset)
+        value.set(OFFSET_VALUE_METADATA_FIELD_V2, offsetAndMetadata.metadata)
+        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2, offsetAndMetadata.commitTimestamp)
+        (2, value)
+      } else {
+        val value = new Struct(OFFSET_COMMIT_VALUE_SCHEMA_V3)
+        value.set(OFFSET_VALUE_OFFSET_FIELD_V3, offsetAndMetadata.offset)
+        value.set(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3,
+          offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
+        value.set(OFFSET_VALUE_METADATA_FIELD_V3, offsetAndMetadata.metadata)
+        value.set(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3, offsetAndMetadata.commitTimestamp)
+        (3, value)
+      }
     }
 
     val byteBuffer = ByteBuffer.allocate(2 /* version */ + value.sizeOf)
-    byteBuffer.putShort(version)
+    byteBuffer.putShort(version.toShort)
     value.writeTo(byteBuffer)
     byteBuffer.array()
   }
@@ -1157,7 +1170,7 @@ object GroupMetadataManager {
       else if (apiVersion < KAFKA_2_1_IV0)
         (1.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V1))
       else
-        (2.toShort, new Struct(CURRENT_GROUP_VALUE_SCHEMA))
+        (2.toShort, new Struct(GROUP_METADATA_VALUE_SCHEMA_V2))
     }
 
     value.set(PROTOCOL_TYPE_KEY, groupMetadata.protocolType.getOrElse(""))
@@ -1242,7 +1255,7 @@ object GroupMetadataManager {
       null
     } else {
       val version = buffer.getShort
-      val valueSchema = schemaForOffset(version)
+      val valueSchema = schemaForOffsetValue(version)
       val value = valueSchema.read(buffer)
 
       if (version == 0) {
@@ -1264,6 +1277,14 @@ object GroupMetadataManager {
         val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V2).asInstanceOf[Long]
 
         OffsetAndMetadata(offset, metadata, commitTimestamp)
+      } else if (version == 3) {
+        val offset = value.get(OFFSET_VALUE_OFFSET_FIELD_V3).asInstanceOf[Long]
+        val leaderEpoch = value.get(OFFSET_VALUE_LEADER_EPOCH_FIELD_V3).asInstanceOf[Int]
+        val metadata = value.get(OFFSET_VALUE_METADATA_FIELD_V3).asInstanceOf[String]
+        val commitTimestamp = value.get(OFFSET_VALUE_COMMIT_TIMESTAMP_FIELD_V3).asInstanceOf[Long]
+
+        val leaderEpochOpt: Optional[Integer] = if (leaderEpoch < 0) Optional.empty() else Optional.of(leaderEpoch)
+        OffsetAndMetadata(offset, leaderEpochOpt, metadata, commitTimestamp)
       } else {
         throw new IllegalStateException(s"Unknown offset message version: $version")
       }
@@ -1282,7 +1303,7 @@ object GroupMetadataManager {
       null
     } else {
       val version = buffer.getShort
-      val valueSchema = schemaForGroup(version)
+      val valueSchema = schemaForGroupValue(version)
       val value = valueSchema.read(buffer)
 
       if (version >= 0 && version <= 2) {
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 10119c8..d0be97f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -27,7 +27,7 @@ import java.util.{Collections, Optional, Properties}
 import kafka.admin.{AdminUtils, RackAwareMode}
 import kafka.api.{ApiVersion, KAFKA_0_11_0_IV0}
 import kafka.cluster.Partition
-import kafka.common.{OffsetAndMetadata, OffsetMetadata}
+import kafka.common.OffsetAndMetadata
 import kafka.controller.KafkaController
 import kafka.coordinator.group.{GroupCoordinator, JoinGroupResult}
 import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator}
@@ -340,9 +340,11 @@ class KafkaApis(val requestChannel: RequestChannel,
         //   - For v5 and beyond there is no per partition expiration timestamp, so this field is no longer in effect
         val currentTimestamp = time.milliseconds
         val partitionData = authorizedTopicRequestInfo.mapValues { partitionData =>
-          val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
+          val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
           new OffsetAndMetadata(
-            offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+            offset = partitionData.offset,
+            leaderEpoch = partitionData.leaderEpoch,
+            metadata = metadata,
             commitTimestamp = partitionData.timestamp match {
               case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimestamp
               case customTimestamp => customTimestamp
@@ -1907,15 +1909,15 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = {
-    val offsetRetention = groupCoordinator.offsetConfig.offsetsRetentionMs
     val currentTimestamp = time.milliseconds
-    val defaultExpireTimestamp = offsetRetention + currentTimestamp
     offsetsMap.map { case (topicPartition, partitionData) =>
-      val metadata = if (partitionData.metadata == null) OffsetMetadata.NoMetadata else partitionData.metadata
+      val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata
       topicPartition -> new OffsetAndMetadata(
-        offsetMetadata = OffsetMetadata(partitionData.offset, metadata),
+        offset = partitionData.offset,
+        leaderEpoch = partitionData.leaderEpoch,
+        metadata = metadata,
         commitTimestamp = currentTimestamp,
-        expireTimestamp = Some(defaultExpireTimestamp))
+        expireTimestamp = None)
     }
   }
 
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 4417028..522ca49 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.util
 import java.util.regex.Pattern
-import java.util.{Collections, Locale, Properties}
+import java.util.{Collections, Locale, Optional, Properties}
 
 import kafka.log.LogConfig
 import kafka.server.KafkaConfig
@@ -503,7 +503,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumer.assign(List(tp).asJava)
 
     // sync commit
-    val syncMetadata = new OffsetAndMetadata(5, "foo")
+    val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
     consumer.commitSync(Map((tp, syncMetadata)).asJava)
     assertEquals(syncMetadata, consumer.committed(tp))
 
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 58059b4..ab14db4 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -18,7 +18,7 @@
 package kafka.api
 
 import java.lang.{Long => JLong}
-import java.util.Properties
+import java.util.{Optional, Properties}
 import java.util.concurrent.TimeUnit
 
 import kafka.integration.KafkaServerTestHarness
@@ -365,6 +365,31 @@ class TransactionsTest extends KafkaServerTestHarness {
   }
 
   @Test
+  def testOffsetMetadataInSendOffsetsToTransaction() = {
+    val tp = new TopicPartition(topic1, 0)
+    val groupId = "group"
+
+    val producer = transactionalProducers.head
+    val consumer = createReadCommittedConsumer(groupId)
+
+    consumer.subscribe(List(topic1).asJava)
+
+    producer.initTransactions()
+
+    producer.beginTransaction()
+    val offsetAndMetadata = new OffsetAndMetadata(110L, Optional.of(15), "some metadata")
+    producer.sendOffsetsToTransaction(Map(tp -> offsetAndMetadata).asJava, groupId)
+    producer.commitTransaction()  // ok
+
+    // The call to commit the transaction may return before all markers are visible, so we initialize a second
+    // producer to ensure the transaction completes and the committed offsets are visible.
+    val producer2 = transactionalProducers(1)
+    producer2.initTransactions()
+
+    assertEquals(offsetAndMetadata, consumer.committed(tp))
+  }
+
+  @Test
   def testFencingOnSend() {
     val producer1 = transactionalProducers(0)
     val producer2 = transactionalProducers(1)
@@ -434,7 +459,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
       val recordMetadata = result.get()
       error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
-      servers.foreach { case (server) =>
+      servers.foreach { server =>
         error(s"log dirs: ${server.logManager.liveLogDirs.map(_.getAbsolutePath).head}")
       }
       fail("Should not be able to send messages from a fenced producer.")
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index befd22a..ef014c8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -17,24 +17,25 @@
 
 package kafka.coordinator.group
 
-import java.util.concurrent.{ ConcurrentHashMap, TimeUnit }
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 
 import kafka.common.OffsetAndMetadata
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest
 import kafka.coordinator.AbstractCoordinatorConcurrencyTest._
 import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
-import kafka.server.{ DelayedOperationPurgatory, KafkaConfig }
+import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.JoinGroupRequest
+import org.apache.kafka.common.utils.Time
 import org.easymock.EasyMock
 import org.junit.Assert._
-import org.junit.{ After, Before, Test }
+import org.junit.{After, Before, Test}
 
 import scala.collection._
 import scala.concurrent.duration.Duration
-import scala.concurrent.{ Await, Future, Promise, TimeoutException }
+import scala.concurrent.{Await, Future, Promise, TimeoutException}
 
 class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest[GroupMember] {
 
@@ -211,7 +212,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
     }
     override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
       val tp = new TopicPartition("topic", 0)
-      val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
+      val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
       groupCoordinator.handleCommitOffsets(member.groupId, member.memberId, member.generationId,
           offsets, responseCallback)
     }
@@ -224,7 +225,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
   class CommitTxnOffsetsOperation extends CommitOffsetsOperation {
     override def runWithCallback(member: GroupMember, responseCallback: CommitOffsetCallback): Unit = {
       val tp = new TopicPartition("topic", 0)
-      val offsets = immutable.Map(tp -> OffsetAndMetadata(1))
+      val offsets = immutable.Map(tp -> OffsetAndMetadata(1, "", Time.SYSTEM.milliseconds()))
       val producerId = 1000L
       val producerEpoch : Short = 2
       // When transaction offsets are appended to the log, transactions may be scheduled for
@@ -309,4 +310,5 @@ object GroupCoordinatorConcurrencyTest {
     @volatile var generationId: Int = -1
     def groupId: String = group.groupId
   }
+
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 608d7cc..9df16ad 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -17,6 +17,8 @@
 
 package kafka.coordinator.group
 
+import java.util.Optional
+
 import kafka.common.OffsetAndMetadata
 import kafka.server.{DelayedOperationPurgatory, KafkaConfig, ReplicaManager}
 import kafka.utils._
@@ -138,7 +140,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val topicPartition = new TopicPartition("foo", 0)
     var offsetCommitErrors = Map.empty[TopicPartition, Errors]
     groupCoordinator.handleCommitOffsets(otherGroupId, memberId, 1,
-      Map(topicPartition -> OffsetAndMetadata(15L)), result => { offsetCommitErrors = result })
+      Map(topicPartition -> offsetAndMetadata(15L)), result => { offsetCommitErrors = result })
     assertEquals(Some(Errors.COORDINATOR_LOAD_IN_PROGRESS), offsetCommitErrors.get(topicPartition))
 
     // Heartbeat
@@ -436,7 +438,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     val sessionTimeout = 1000
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols,
       rebalanceTimeout = sessionTimeout, sessionTimeout = sessionTimeout)
@@ -818,7 +820,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testCommitOffsetFromUnknownGroup() {
     val generationId = 1
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, memberId, generationId, Map(tp -> offset))
     assertEquals(Errors.ILLEGAL_GENERATION, commitOffsetResult(tp))
@@ -827,7 +829,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testCommitOffsetWithDefaultGeneration() {
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
       OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
@@ -854,7 +856,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // The simple offset commit should now fail
     EasyMock.reset(replicaManager)
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
       OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
@@ -865,17 +867,25 @@ class GroupCoordinatorTest extends JUnitSuite {
   }
 
   @Test
-  def testFetchOffsets() {
+  def testFetchOffsets(): Unit = {
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = 97L
+    val metadata = "some metadata"
+    val leaderEpoch = Optional.of[Integer](15)
+    val offsetAndMetadata = OffsetAndMetadata(offset, leaderEpoch, metadata, timer.time.milliseconds())
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
-      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offset))
+      OffsetCommitRequest.DEFAULT_GENERATION_ID, Map(tp -> offsetAndMetadata))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
     val (error, partitionData) = groupCoordinator.handleFetchOffsets(groupId, Some(Seq(tp)))
     assertEquals(Errors.NONE, error)
-    assertEquals(Some(0), partitionData.get(tp).map(_.offset))
+
+    val maybePartitionData = partitionData.get(tp)
+    assertTrue(maybePartitionData.isDefined)
+    assertEquals(offset, maybePartitionData.get.offset)
+    assertEquals(metadata, maybePartitionData.get.metadata)
+    assertEquals(leaderEpoch, maybePartitionData.get.leaderEpoch)
   }
 
   @Test
@@ -884,7 +894,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // To allow inspection and removal of the empty group, we must also support DescribeGroups and DeleteGroups
 
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val groupId = ""
 
     val commitOffsetResult = commitOffsets(groupId, OffsetCommitRequest.DEFAULT_MEMBER_ID,
@@ -919,7 +929,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testBasicFetchTxnOffsets() {
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
@@ -946,7 +956,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testFetchTxnOffsetsWithAbort() {
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
@@ -970,7 +980,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   @Test
   def testFetchTxnOffsetsIgnoreSpuriousCommit() {
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val producerId = 1000L
     val producerEpoch : Short = 2
 
@@ -1003,7 +1013,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // Marker for only one partition is received. That commit should be materialized while the other should not.
 
     val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
-    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
     val producerId = 1000L
     val producerEpoch: Short = 3
 
@@ -1082,7 +1092,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     // Each partition's offsets should be materialized when the corresponding producer's marker is received.
 
     val partitions = List(new TopicPartition("topic1", 0), new TopicPartition("topic2", 0))
-    val offsets = List(OffsetAndMetadata(10), OffsetAndMetadata(15))
+    val offsets = List(offsetAndMetadata(10), offsetAndMetadata(15))
     val producerIds = List(1000L, 1005L)
     val producerEpochs: Seq[Short] = List(3, 4)
 
@@ -1154,9 +1164,9 @@ class GroupCoordinatorTest extends JUnitSuite {
     val tp1 = new TopicPartition("topic", 0)
     val tp2 = new TopicPartition("topic", 1)
     val tp3 = new TopicPartition("other-topic", 0)
-    val offset1 = OffsetAndMetadata(15)
-    val offset2 = OffsetAndMetadata(16)
-    val offset3 = OffsetAndMetadata(17)
+    val offset1 = offsetAndMetadata(15)
+    val offset2 = offsetAndMetadata(16)
+    val offset3 = offsetAndMetadata(17)
 
     assertEquals((Errors.NONE, Map.empty), groupCoordinator.handleFetchOffsets(groupId))
 
@@ -1179,7 +1189,7 @@ class GroupCoordinatorTest extends JUnitSuite {
   def testCommitOffsetInCompletingRebalance() {
     val memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
 
     val joinGroupResult = joinGroup(groupId, memberId, protocolType, protocols)
     val assignedMemberId = joinGroupResult.memberId
@@ -1425,7 +1435,7 @@ class GroupCoordinatorTest extends JUnitSuite {
 
     EasyMock.reset(replicaManager)
     val tp = new TopicPartition("topic", 0)
-    val offset = OffsetAndMetadata(0)
+    val offset = offsetAndMetadata(0)
     val commitOffsetResult = commitOffsets(groupId, assignedMemberId, joinGroupResult.generationId, Map(tp -> offset))
     assertEquals(Errors.NONE, commitOffsetResult(tp))
 
@@ -1730,4 +1740,8 @@ class GroupCoordinatorTest extends JUnitSuite {
     groupCoordinator.groupManager.handleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
   }
 
+  private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, "", timer.time.milliseconds())
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index a0dfbda..b48f297 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.coordinator.group
 
-import kafka.api.{ApiVersion, KAFKA_1_1_IV0, KAFKA_2_1_IV0}
+import kafka.api._
 import kafka.cluster.Partition
 import kafka.common.OffsetAndMetadata
 import kafka.log.{Log, LogAppendInfo}
@@ -35,6 +35,7 @@ import org.easymock.{Capture, EasyMock, IAnswer}
 import org.junit.Assert.{assertEquals, assertFalse, assertNull, assertTrue}
 import org.junit.{Before, Test}
 import java.nio.ByteBuffer
+import java.util.Optional
 
 import com.yammer.metrics.Metrics
 import com.yammer.metrics.core.Gauge
@@ -932,7 +933,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
 
     expectAppendMessage(Errors.NONE)
     EasyMock.replay(replicaManager)
@@ -974,7 +975,8 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsetAndMetadata = OffsetAndMetadata(offset, "", time.milliseconds())
+    val offsets = immutable.Map(topicPartition -> offsetAndMetadata)
 
     val capturedResponseCallback = appendAndCaptureCallback()
     EasyMock.replay(replicaManager)
@@ -996,7 +998,7 @@ class GroupMetadataManagerTest {
     group.completePendingTxnOffsetCommit(producerId, isCommit = true)
     assertTrue(group.hasOffsets)
     assertFalse(group.allOffsets.isEmpty)
-    assertEquals(Some(OffsetAndMetadata(offset)), group.offset(topicPartition))
+    assertEquals(Some(offsetAndMetadata), group.offset(topicPartition))
 
     EasyMock.verify(replicaManager)
   }
@@ -1014,7 +1016,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
 
     val capturedResponseCallback = appendAndCaptureCallback()
     EasyMock.replay(replicaManager)
@@ -1053,7 +1055,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
 
     val capturedResponseCallback = appendAndCaptureCallback()
     EasyMock.replay(replicaManager)
@@ -1091,7 +1093,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
 
     EasyMock.replay(replicaManager)
 
@@ -1133,7 +1135,7 @@ class GroupMetadataManagerTest {
     val group = new GroupMetadata(groupId, Empty, time)
     groupMetadataManager.addGroup(group)
 
-    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset))
+    val offsets = immutable.Map(topicPartition -> OffsetAndMetadata(offset, "", time.milliseconds()))
 
     val capturedResponseCallback = appendAndCaptureCallback()
     EasyMock.replay(replicaManager)
@@ -1329,7 +1331,7 @@ class GroupMetadataManagerTest {
     // expire the offset after 1 millisecond
     val startMs = time.milliseconds
     val offsets = immutable.Map(
-      topicPartition1 -> OffsetAndMetadata(offset, "", startMs, startMs + 1),
+      topicPartition1 -> OffsetAndMetadata(offset, Optional.empty(), "", startMs, Some(startMs + 1)),
       topicPartition2 -> OffsetAndMetadata(offset, "", startMs, startMs + 3))
 
     mockGetPartition()
@@ -1633,7 +1635,7 @@ class GroupMetadataManagerTest {
     )
 
     val apiVersion = KAFKA_1_1_IV0
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTime = Some(100))
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, apiVersion = apiVersion, retentionTimeOpt = Some(100))
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId, apiVersion = apiVersion)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@@ -1673,7 +1675,7 @@ class GroupMetadataManagerTest {
       new TopicPartition("bar", 0) -> 8992L
     )
 
-    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTime = Some(100))
+    val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets, retentionTimeOpt = Some(100))
     val memberId = "98098230493"
     val groupMetadataRecord = buildStableGroupRecordWithMember(generation, protocolType, protocol, memberId)
     val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
@@ -1700,6 +1702,70 @@ class GroupMetadataManagerTest {
     }
   }
 
+  @Test
+  def testSerdeOffsetCommitValue(): Unit = {
+    val offsetAndMetadata = OffsetAndMetadata(
+      offset = 537L,
+      leaderEpoch = Optional.of(15),
+      metadata = "metadata",
+      commitTimestamp = time.milliseconds(),
+      expireTimestamp = None)
+
+    def verifySerde(apiVersion: ApiVersion, expectedOffsetCommitValueVersion: Int): Unit = {
+      val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+      val buffer = ByteBuffer.wrap(bytes)
+
+      assertEquals(expectedOffsetCommitValueVersion, buffer.getShort(0).toInt)
+
+      val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+      assertEquals(offsetAndMetadata.offset, deserializedOffsetAndMetadata.offset)
+      assertEquals(offsetAndMetadata.metadata, deserializedOffsetAndMetadata.metadata)
+      assertEquals(offsetAndMetadata.commitTimestamp, deserializedOffsetAndMetadata.commitTimestamp)
+
+      // Serialization drops the leader epoch silently if an older inter-broker protocol is in use
+      val expectedLeaderEpoch = if (expectedOffsetCommitValueVersion >= 3)
+        offsetAndMetadata.leaderEpoch
+      else
+        Optional.empty()
+
+      assertEquals(expectedLeaderEpoch, deserializedOffsetAndMetadata.leaderEpoch)
+    }
+
+    for (version <- ApiVersion.allVersions) {
+      val expectedSchemaVersion = version match {
+        case v if v < KAFKA_2_1_IV0 => 1
+        case v if v < KAFKA_2_1_IV1 => 2
+        case _ => 3
+      }
+      verifySerde(version, expectedSchemaVersion)
+    }
+  }
+
+  @Test
+  def testSerdeOffsetCommitValueWithExpireTimestamp(): Unit = {
+    // If expire timestamp is set, we should always use version 1 of the offset commit
+    // value schema since later versions do not support it
+
+    val offsetAndMetadata = OffsetAndMetadata(
+      offset = 537L,
+      leaderEpoch = Optional.empty(),
+      metadata = "metadata",
+      commitTimestamp = time.milliseconds(),
+      expireTimestamp = Some(time.milliseconds() + 1000))
+
+    def verifySerde(apiVersion: ApiVersion): Unit = {
+      val bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
+      val buffer = ByteBuffer.wrap(bytes)
+      assertEquals(1, buffer.getShort(0).toInt)
+
+      val deserializedOffsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(buffer)
+      assertEquals(offsetAndMetadata, deserializedOffsetAndMetadata)
+    }
+
+    for (version <- ApiVersion.allVersions)
+      verifySerde(version)
+  }
+
   private def appendAndCaptureCallback(): Capture[Map[TopicPartition, PartitionResponse] => Unit] = {
     val capturedArgument: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
     EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
@@ -1804,14 +1870,15 @@ class GroupMetadataManagerTest {
   private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],
                                            groupId: String = groupId,
                                            apiVersion: ApiVersion = ApiVersion.latestVersion,
-                                           retentionTime: Option[Long] = None): Seq[SimpleRecord] = {
+                                           retentionTimeOpt: Option[Long] = None): Seq[SimpleRecord] = {
     committedOffsets.map { case (topicPartition, offset) =>
-      val offsetAndMetadata = retentionTime match {
-        case Some(timestamp) =>
-          val commitTimestamp = time.milliseconds()
-          OffsetAndMetadata(offset, "", commitTimestamp, commitTimestamp + timestamp)
+      val commitTimestamp = time.milliseconds()
+      val offsetAndMetadata = retentionTimeOpt match {
+        case Some(retentionTimeMs) =>
+          val expirationTime = commitTimestamp + retentionTimeMs
+          OffsetAndMetadata(offset, "", commitTimestamp, expirationTime)
         case None =>
-          OffsetAndMetadata(offset)
+          OffsetAndMetadata(offset, "", commitTimestamp)
       }
       val offsetCommitKey = GroupMetadataManager.offsetCommitKey(groupId, topicPartition)
       val offsetCommitValue = GroupMetadataManager.offsetCommitValue(offsetAndMetadata, apiVersion)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index ac12804..f45a6e2 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -293,7 +293,7 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testOffsetCommit(): Unit = {
     val partition = new TopicPartition("foo", 0)
-    val offset = OffsetAndMetadata(37)
+    val offset = offsetAndMetadata(37)
     val commitRecordOffset = 3
 
     group.prepareOffsetCommit(Map(partition -> offset))
@@ -308,7 +308,7 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testOffsetCommitFailure(): Unit = {
     val partition = new TopicPartition("foo", 0)
-    val offset = OffsetAndMetadata(37)
+    val offset = offsetAndMetadata(37)
 
     group.prepareOffsetCommit(Map(partition -> offset))
     assertTrue(group.hasOffsets)
@@ -322,8 +322,8 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testOffsetCommitFailureWithAnotherPending(): Unit = {
     val partition = new TopicPartition("foo", 0)
-    val firstOffset = OffsetAndMetadata(37)
-    val secondOffset = OffsetAndMetadata(57)
+    val firstOffset = offsetAndMetadata(37)
+    val secondOffset = offsetAndMetadata(57)
 
     group.prepareOffsetCommit(Map(partition -> firstOffset))
     assertTrue(group.hasOffsets)
@@ -344,8 +344,8 @@ class GroupMetadataTest extends JUnitSuite {
   @Test
   def testOffsetCommitWithAnotherPending(): Unit = {
     val partition = new TopicPartition("foo", 0)
-    val firstOffset = OffsetAndMetadata(37)
-    val secondOffset = OffsetAndMetadata(57)
+    val firstOffset = offsetAndMetadata(37)
+    val secondOffset = offsetAndMetadata(57)
 
     group.prepareOffsetCommit(Map(partition -> firstOffset))
     assertTrue(group.hasOffsets)
@@ -367,8 +367,8 @@ class GroupMetadataTest extends JUnitSuite {
   def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
     val partition = new TopicPartition("foo", 0)
     val producerId = 13232L
-    val txnOffsetCommit = OffsetAndMetadata(37)
-    val consumerOffsetCommit = OffsetAndMetadata(57)
+    val txnOffsetCommit = offsetAndMetadata(37)
+    val consumerOffsetCommit = offsetAndMetadata(57)
 
     group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
     assertTrue(group.hasOffsets)
@@ -392,8 +392,8 @@ class GroupMetadataTest extends JUnitSuite {
   def testTransactionBeatsConsumerOffsetCommit(): Unit = {
     val partition = new TopicPartition("foo", 0)
     val producerId = 13232L
-    val txnOffsetCommit = OffsetAndMetadata(37)
-    val consumerOffsetCommit = OffsetAndMetadata(57)
+    val txnOffsetCommit = offsetAndMetadata(37)
+    val consumerOffsetCommit = offsetAndMetadata(57)
 
     group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
     assertTrue(group.hasOffsets)
@@ -419,8 +419,8 @@ class GroupMetadataTest extends JUnitSuite {
   def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
     val partition = new TopicPartition("foo", 0)
     val producerId = 13232L
-    val txnOffsetCommit = OffsetAndMetadata(37)
-    val consumerOffsetCommit = OffsetAndMetadata(57)
+    val txnOffsetCommit = offsetAndMetadata(37)
+    val consumerOffsetCommit = offsetAndMetadata(57)
 
     group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
     assertTrue(group.hasOffsets)
@@ -447,7 +447,7 @@ class GroupMetadataTest extends JUnitSuite {
   def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
     val partition = new TopicPartition("foo", 0)
     val producerId = 13232L
-    val txnOffsetCommit = OffsetAndMetadata(37)
+    val txnOffsetCommit = offsetAndMetadata(37)
 
     group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
     assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
@@ -471,4 +471,9 @@ class GroupMetadataTest extends JUnitSuite {
     }
     assertTrue(group.is(targetState))
   }
+
+  private def offsetAndMetadata(offset: Long): OffsetAndMetadata = {
+    OffsetAndMetadata(offset, "", Time.SYSTEM.milliseconds())
+  }
+
 }