You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/23 16:46:59 UTC

kafka git commit: KAFKA-5247; Materialize committed offsets in offset order

Repository: kafka
Updated Branches:
  refs/heads/trunk e3e2f1d22 -> d1853f791


KAFKA-5247; Materialize committed offsets in offset order

With this patch, offset commits are always materialized according to the order of the commit records in the offsets topic.

Before this patch, transactional offset commits were materialized in transaction order. However, the log cleaner will always preserve the record with the greatest offset. This meant that if there was a mix of offset commits from a consumer and a transactional producer, then it we would switch from transactional order to offset order after cleaning, resulting in an inconsistent state.

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3108 from apurvam/KAFKA-5247-materialize-committed-offsets-in-offset-order


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1853f79
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1853f79
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1853f79

Branch: refs/heads/trunk
Commit: d1853f791147d66303c2700a8e58af762518188c
Parents: e3e2f1d
Author: Apurva Mehta <ap...@confluent.io>
Authored: Tue May 23 09:45:31 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Tue May 23 09:45:31 2017 -0700

----------------------------------------------------------------------
 .../kafka/coordinator/group/GroupMetadata.scala |  96 ++++++++++++----
 .../group/GroupMetadataManager.scala            |  50 +++++----
 .../group/GroupMetadataManagerTest.scala        | 102 ++++++++++++++++-
 .../coordinator/group/GroupMetadataTest.scala   | 109 ++++++++++++++++++-
 4 files changed, 310 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1853f79/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
index 2f76d63..302fcb5 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadata.scala
@@ -127,6 +127,16 @@ case class GroupSummary(state: String,
                         members: List[MemberSummary])
 
 /**
+  * We cache offset commits along with their commit record offset. This enables us to ensure that the latest offset
+  * commit is always materialized when we have a mix of transactional and regular offset commits. Without preserving
+  * information of the commit record offset, compaction of the offsets topic it self may result in the wrong offset commit
+  * being materialized.
+  */
+case class CommitRecordMetadataAndOffset(appendedBatchOffset: Option[Long], offsetAndMetadata: OffsetAndMetadata) {
+  def olderThan(that: CommitRecordMetadataAndOffset) : Boolean = appendedBatchOffset.get < that.appendedBatchOffset.get
+}
+
+/**
  * Group contains the following metadata:
  *
  *  Membership metadata:
@@ -143,12 +153,17 @@ case class GroupSummary(state: String,
 private[group] class GroupMetadata(val groupId: String, initialState: GroupState = Empty) extends Logging {
 
   private var state: GroupState = initialState
+
   private val members = new mutable.HashMap[String, MemberMetadata]
-  private val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
+
+  private val offsets = new mutable.HashMap[TopicPartition, CommitRecordMetadataAndOffset]
+
   private val pendingOffsetCommits = new mutable.HashMap[TopicPartition, OffsetAndMetadata]
-  // A map from a producer id to the open offset commits for that producer id.
-  private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]()
+
+  private val pendingTransactionalOffsetCommits = new mutable.HashMap[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]()
+
   private var receivedTransactionalOffsetCommits = false
+
   private var receivedConsumerOffsetCommits = false
 
   var protocolType: Option[String] = None
@@ -271,19 +286,27 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     GroupOverview(groupId, protocolType.getOrElse(""))
   }
 
-  def initializeOffsets(offsets: collection.Map[TopicPartition, OffsetAndMetadata],
-                        pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]) {
+  def initializeOffsets(offsets: collection.Map[TopicPartition, CommitRecordMetadataAndOffset],
+                        pendingTxnOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) {
     this.offsets ++= offsets
     this.pendingTransactionalOffsetCommits ++= pendingTxnOffsets
   }
 
-  def completePendingOffsetWrite(topicPartition: TopicPartition, offset: OffsetAndMetadata) {
-    if (pendingOffsetCommits.contains(topicPartition))
-      offsets.put(topicPartition, offset)
+  def onOffsetCommitAppend(topicPartition: TopicPartition, offsetWithCommitRecordMetadata: CommitRecordMetadataAndOffset) {
+    if (pendingOffsetCommits.contains(topicPartition)) {
+      if (offsetWithCommitRecordMetadata.appendedBatchOffset.isEmpty)
+        throw new IllegalStateException("Cannot complete offset commit write without providing the metadata of the record " +
+          "in the log.")
+      if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(offsetWithCommitRecordMetadata))
+        offsets.put(topicPartition, offsetWithCommitRecordMetadata)
+    }
 
     pendingOffsetCommits.get(topicPartition) match {
-      case Some(stagedOffset) if offset == stagedOffset => pendingOffsetCommits.remove(topicPartition)
+      case Some(stagedOffset) if offsetWithCommitRecordMetadata.offsetAndMetadata == stagedOffset =>
+        pendingOffsetCommits.remove(topicPartition)
       case _ =>
+        // The pendingOffsetCommits for this partition could be empty if the topic was deleted, in which case
+        // its entries would be removed from the cache by the `removeOffsets` method.
     }
   }
 
@@ -301,8 +324,12 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
 
   def prepareTxnOffsetCommit(producerId: Long, offsets: Map[TopicPartition, OffsetAndMetadata]) {
     receivedTransactionalOffsetCommits = true
-    val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, OffsetAndMetadata])
-    producerOffsets ++= offsets
+    val producerOffsets = pendingTransactionalOffsetCommits.getOrElseUpdate(producerId,
+      mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+
+    offsets.foreach { case (topicPartition, offsetAndMetadata) =>
+      producerOffsets.put(topicPartition, CommitRecordMetadataAndOffset(None, offsetAndMetadata))
+    }
   }
 
   def hasReceivedConsistentOffsetCommits : Boolean = {
@@ -323,14 +350,32 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
     }
   }
 
+  def onTxnOffsetCommitAppend(producerId: Long, topicPartition: TopicPartition,
+                              commitRecordMetadataAndOffset: CommitRecordMetadataAndOffset) {
+    pendingTransactionalOffsetCommits.get(producerId) match {
+      case Some(pendingOffset) =>
+        if (pendingOffset.contains(topicPartition)
+          && pendingOffset(topicPartition).offsetAndMetadata == commitRecordMetadataAndOffset.offsetAndMetadata)
+          pendingOffset.update(topicPartition, commitRecordMetadataAndOffset)
+      case _ =>
+        // We may hit this case if the partition in question has emigrated.
+    }
+  }
+
   /* Complete a pending transactional offset commit. This is called after a commit or abort marker is fully written
    * to the log.
    */
   def completePendingTxnOffsetCommit(producerId: Long, isCommit: Boolean): Unit = {
     trace(s"Completing transactional offset commit for producer $producerId and group $groupId. isCommit: $isCommit")
     if (isCommit) {
-      val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, OffsetAndMetadata])
-      offsets ++= producerOffsets
+      val producerOffsets = pendingTransactionalOffsetCommits.getOrElse(producerId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+      producerOffsets.foreach { case (topicPartition, commitRecordMetadataAndOffset) =>
+        if (commitRecordMetadataAndOffset.appendedBatchOffset.isEmpty)
+          throw new IllegalStateException(s"Trying to complete a transactional offset commit for producerId $producerId " +
+            s"and groupId $groupId even though the the offset commit record itself hasn't been appended to the log.")
+        if (!offsets.contains(topicPartition) || offsets(topicPartition).olderThan(commitRecordMetadataAndOffset))
+          offsets.put(topicPartition, commitRecordMetadataAndOffset)
+      }
     }
     pendingTransactionalOffsetCommits.remove(producerId)
   }
@@ -347,21 +392,32 @@ private[group] class GroupMetadata(val groupId: String, initialState: GroupState
         pendingOffsets.remove(topicPartition)
       }
       val removedOffset = offsets.remove(topicPartition)
-      removedOffset.map(topicPartition -> _)
+      removedOffset.map(topicPartition -> _.offsetAndMetadata)
     }.toMap
   }
 
-  def removeExpiredOffsets(startMs: Long) = {
-    val expiredOffsets = offsets.filter {
-      case (topicPartition, offset) => offset.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
-    }
+  def removeExpiredOffsets(startMs: Long) : Map[TopicPartition, OffsetAndMetadata] = {
+    val expiredOffsets = offsets
+      .filter {
+        case (topicPartition, commitRecordMetadataAndOffset) =>
+          commitRecordMetadataAndOffset.offsetAndMetadata.expireTimestamp < startMs && !pendingOffsetCommits.contains(topicPartition)
+      }
+      .map {
+        case (topicPartition, commitRecordOffsetAndMetadata) =>
+          (topicPartition, commitRecordOffsetAndMetadata.offsetAndMetadata)
+      }
     offsets --= expiredOffsets.keySet
     expiredOffsets.toMap
   }
 
-  def allOffsets = offsets.toMap
+  def allOffsets = offsets.map { case (topicPartition, commitRecordMetadataAndOffset) =>
+    (topicPartition, commitRecordMetadataAndOffset.offsetAndMetadata)
+  }.toMap
+
+  def offset(topicPartition: TopicPartition) : Option[OffsetAndMetadata] = offsets.get(topicPartition).map(_.offsetAndMetadata)
 
-  def offset(topicPartition: TopicPartition) = offsets.get(topicPartition)
+  // visible for testing
+  private[group] def offsetWithRecordMetadata(topicPartition: TopicPartition): Option[CommitRecordMetadataAndOffset] = offsets.get(topicPartition)
 
   def numOffsets = offsets.size
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1853f79/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 74a3f7b..f8f536e 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -309,9 +309,12 @@ class GroupMetadataManager(brokerId: Int,
 
             val responseError = group synchronized {
               if (status.error == Errors.NONE) {
-                if (!group.is(Dead) && !isTxnOffsetCommit) {
+                if (!group.is(Dead)) {
                   filteredOffsetMetadata.foreach { case (topicPartition, offsetAndMetadata) =>
-                    group.completePendingOffsetWrite(topicPartition, offsetAndMetadata)
+                    if (isTxnOffsetCommit)
+                      group.onTxnOffsetCommitAppend(producerId, topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
+                    else
+                      group.onOffsetCommitAppend(topicPartition, CommitRecordMetadataAndOffset(Some(status.baseOffset), offsetAndMetadata))
                   }
                 }
                 Errors.NONE
@@ -473,8 +476,8 @@ class GroupMetadataManager(brokerId: Int,
         lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
 
         // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
-        val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
-        val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, OffsetAndMetadata]]()
+        val loadedOffsets = mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]()
+        val pendingOffsets = mutable.Map[Long, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]]()
         val loadedGroups = mutable.Map[String, GroupMetadata]()
         val removedGroups = mutable.Set[String]()
 
@@ -496,10 +499,11 @@ class GroupMetadataManager(brokerId: Int,
               val record = batch.iterator.next()
               val controlRecord = ControlRecordType.parse(record.key)
               if (controlRecord == ControlRecordType.COMMIT) {
-                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, OffsetAndMetadata]())
+                pendingOffsets.getOrElse(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
                   .foreach {
-                    case (groupTopicPartition, offsetAndMetadata) =>
-                      loadedOffsets.put(groupTopicPartition, offsetAndMetadata)
+                    case (groupTopicPartition, commitRecordMetadataAndOffset) =>
+                      if (!loadedOffsets.contains(groupTopicPartition) || loadedOffsets(groupTopicPartition).olderThan(commitRecordMetadataAndOffset))
+                        loadedOffsets.put(groupTopicPartition, commitRecordMetadataAndOffset)
                   }
               }
               pendingOffsets.remove(batch.producerId)
@@ -510,7 +514,7 @@ class GroupMetadataManager(brokerId: Int,
 
                   case offsetKey: OffsetKey =>
                     if (isTxnOffsetCommit && !pendingOffsets.contains(batch.producerId))
-                      pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, OffsetAndMetadata]())
+                      pendingOffsets.put(batch.producerId, mutable.Map[GroupTopicPartition, CommitRecordMetadataAndOffset]())
 
                     // load offset
                     val groupTopicPartition = offsetKey.key
@@ -520,11 +524,11 @@ class GroupMetadataManager(brokerId: Int,
                       else
                         loadedOffsets.remove(groupTopicPartition)
                     } else {
-                      val value = GroupMetadataManager.readOffsetMessageValue(record.value)
+                      val offsetAndMetadata = GroupMetadataManager.readOffsetMessageValue(record.value)
                       if (isTxnOffsetCommit)
-                        pendingOffsets(batch.producerId).put(groupTopicPartition, value)
+                        pendingOffsets(batch.producerId).put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
                       else
-                        loadedOffsets.put(groupTopicPartition, value)
+                        loadedOffsets.put(groupTopicPartition, CommitRecordMetadataAndOffset(Some(batch.baseOffset), offsetAndMetadata))
                     }
 
                   case groupMetadataKey: GroupMetadataKey =>
@@ -554,15 +558,15 @@ class GroupMetadataManager(brokerId: Int,
             .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) })
             .partition { case (group, _) => loadedGroups.contains(group) }
 
-          val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]]()
+          val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
           pendingOffsets.foreach { case (producerId, producerOffsets) =>
             producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
             producerOffsets
               .groupBy(_._1.group)
               .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)})
               .foreach { case (group, offsets) =>
-                val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]])
-                val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, OffsetAndMetadata])
+                val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+                val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
                 groupProducerOffsets ++= offsets
               }
           }
@@ -571,8 +575,8 @@ class GroupMetadataManager(brokerId: Int,
             .partition { case (group, _) => loadedGroups.contains(group)}
 
           loadedGroups.values.foreach { group =>
-            val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, OffsetAndMetadata])
-            val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]])
+            val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+            val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
@@ -581,8 +585,8 @@ class GroupMetadataManager(brokerId: Int,
           // metadata stored in the log
           (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
             val group = new GroupMetadata(groupId)
-            val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, OffsetAndMetadata])
-            val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, OffsetAndMetadata]])
+            val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+            val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
             loadGroup(group, offsets, pendingOffsets)
             onGroupLoaded(group)
           }
@@ -603,17 +607,19 @@ class GroupMetadataManager(brokerId: Int,
     }
   }
 
-  private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, OffsetAndMetadata],
-                        pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, OffsetAndMetadata]]): Unit = {
+  private def loadGroup(group: GroupMetadata, offsets: Map[TopicPartition, CommitRecordMetadataAndOffset],
+                        pendingTransactionalOffsets: Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]): Unit = {
     // offsets are initialized prior to loading the group into the cache to ensure that clients see a consistent
     // view of the group's offsets
-    val loadedOffsets = offsets.mapValues { offsetAndMetadata =>
+    val loadedOffsets = offsets.mapValues { case CommitRecordMetadataAndOffset(commitRecordOffset, offsetAndMetadata) =>
       // special handling for version 0:
       // set the expiration time stamp as commit time stamp + server default retention time
-      if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
+      val updatedOffsetAndMetadata =
+        if (offsetAndMetadata.expireTimestamp == org.apache.kafka.common.requests.OffsetCommitRequest.DEFAULT_TIMESTAMP)
         offsetAndMetadata.copy(expireTimestamp = offsetAndMetadata.commitTimestamp + config.offsetsRetentionMs)
       else
         offsetAndMetadata
+      CommitRecordMetadataAndOffset(commitRecordOffset, updatedOffsetAndMetadata)
     }
     trace(s"Initialized offsets $loadedOffsets for group ${group.groupId}")
     group.initializeOffsets(loadedOffsets, pendingTransactionalOffsets.toMap)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1853f79/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index f76eb7b..8318741 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -278,6 +278,7 @@ class GroupMetadataManagerTest {
 
     val buffer = ByteBuffer.allocate(1024)
     var nextOffset = 0
+    val commitOffsetsLogPosition = nextOffset
     nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, committedOffsets)
     nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
     nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, abortedOffsets)
@@ -301,6 +302,7 @@ class GroupMetadataManagerTest {
     assertEquals(committedOffsets.size, group.allOffsets.size)
     committedOffsets.foreach { case (topicPartition, offset) =>
       assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertEquals(Some(commitOffsetsLogPosition), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
     }
 
     // We should have pending commits.
@@ -335,9 +337,13 @@ class GroupMetadataManagerTest {
     )
 
     val buffer = ByteBuffer.allocate(1024)
-    var nextOffset = 0
+    var nextOffset = 0L
+
+    val firstProduceRecordOffset = nextOffset
     nextOffset += appendTransactionalOffsetCommits(buffer, firstProducerId, firstProducerEpoch, nextOffset, committedOffsetsFirstProducer)
     nextOffset += completeTransactionalOffsetCommit(buffer, firstProducerId, firstProducerEpoch, nextOffset, isCommit = true)
+
+    val secondProducerRecordOffset = nextOffset
     nextOffset += appendTransactionalOffsetCommits(buffer, secondProducerId, secondProducerEpoch, nextOffset, committedOffsetsSecondProducer)
     nextOffset += completeTransactionalOffsetCommit(buffer, secondProducerId, secondProducerEpoch, nextOffset, isCommit = true)
     buffer.flip()
@@ -358,12 +364,106 @@ class GroupMetadataManagerTest {
     assertEquals(committedOffsetsFirstProducer.size + committedOffsetsSecondProducer.size, group.allOffsets.size)
     committedOffsetsFirstProducer.foreach { case (topicPartition, offset) =>
       assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertEquals(Some(firstProduceRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
     }
     committedOffsetsSecondProducer.foreach { case (topicPartition, offset) =>
       assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertEquals(Some(secondProducerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
+    }
+  }
+
+  @Test
+  def testGroupLoadWithConsumerAndTransactionalOffsetCommitsConsumerWins(): Unit = {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val transactionalOffsetCommits = Map(
+      new TopicPartition("foo", 0) -> 23L
+    )
+
+    val consumerOffsetCommits = Map(
+      new TopicPartition("foo", 0) -> 24L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits)
+    val consumerRecordOffset = nextOffset
+    nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // The group should be loaded with pending offsets.
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(1, group.allOffsets.size)
+    assertTrue(group.hasOffsets)
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+    assertEquals(consumerOffsetCommits.size, group.allOffsets.size)
+    consumerOffsetCommits.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+      assertEquals(Some(consumerRecordOffset), group.offsetWithRecordMetadata(topicPartition).head.appendedBatchOffset)
     }
   }
 
+  @Test
+  def testGroupLoadWithConsumerAndTransactionalOffsetCommitsTransactionWins(): Unit = {
+    val groupMetadataTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
+    val producerId = 1000L
+    val producerEpoch: Short = 2
+
+    val transactionalOffsetCommits = Map(
+      new TopicPartition("foo", 0) -> 23L
+    )
+
+    val consumerOffsetCommits = Map(
+      new TopicPartition("foo", 0) -> 24L
+    )
+
+    val buffer = ByteBuffer.allocate(1024)
+    var nextOffset = 0
+    nextOffset += appendConsumerOffsetCommit(buffer, nextOffset, consumerOffsetCommits)
+    nextOffset += appendTransactionalOffsetCommits(buffer, producerId, producerEpoch, nextOffset, transactionalOffsetCommits)
+    nextOffset += completeTransactionalOffsetCommit(buffer, producerId, producerEpoch, nextOffset, isCommit = true)
+    buffer.flip()
+
+    val records = MemoryRecords.readableRecords(buffer)
+    expectGroupMetadataLoad(groupMetadataTopicPartition, 0, records)
+
+    EasyMock.replay(replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+    // The group should be loaded with pending offsets.
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Empty, group.currentState)
+    assertEquals(1, group.allOffsets.size)
+    assertTrue(group.hasOffsets)
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+    assertEquals(consumerOffsetCommits.size, group.allOffsets.size)
+    transactionalOffsetCommits.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+  private def appendConsumerOffsetCommit(buffer: ByteBuffer, baseOffset: Long, offsets: Map[TopicPartition, Long]) = {
+    val builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.LOG_APPEND_TIME, baseOffset)
+    val commitRecords = createCommittedOffsetRecords(offsets)
+    commitRecords.foreach(builder.append)
+    builder.build()
+    offsets.size
+  }
+
   private def appendTransactionalOffsetCommits(buffer: ByteBuffer, producerId: Long, producerEpoch: Short,
                                                baseOffset: Long, offsets: Map[TopicPartition, Long]): Int = {
     val builder = MemoryRecords.builder(buffer, CompressionType.NONE, baseOffset, producerId, producerEpoch, 0, true)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1853f79/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
index e62c0d3..0e13f89 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataTest.scala
@@ -296,12 +296,13 @@ class GroupMetadataTest extends JUnitSuite {
   def testOffsetCommit(): Unit = {
     val partition = new TopicPartition("foo", 0)
     val offset = OffsetAndMetadata(37)
+    val commitRecordOffset = 3
 
     group.prepareOffsetCommit(Map(partition -> offset))
     assertTrue(group.hasOffsets)
     assertEquals(None, group.offset(partition))
 
-    group.completePendingOffsetWrite(partition, offset)
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(commitRecordOffset), offset))
     assertTrue(group.hasOffsets)
     assertEquals(Some(offset), group.offset(partition))
   }
@@ -337,7 +338,7 @@ class GroupMetadataTest extends JUnitSuite {
     assertTrue(group.hasOffsets)
     assertEquals(None, group.offset(partition))
 
-    group.completePendingOffsetWrite(partition, secondOffset)
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), secondOffset))
     assertTrue(group.hasOffsets)
     assertEquals(Some(secondOffset), group.offset(partition))
   }
@@ -355,15 +356,115 @@ class GroupMetadataTest extends JUnitSuite {
     group.prepareOffsetCommit(Map(partition -> secondOffset))
     assertTrue(group.hasOffsets)
 
-    group.completePendingOffsetWrite(partition, firstOffset)
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(4L), firstOffset))
     assertTrue(group.hasOffsets)
     assertEquals(Some(firstOffset), group.offset(partition))
 
-    group.completePendingOffsetWrite(partition, secondOffset)
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(5L), secondOffset))
     assertTrue(group.hasOffsets)
     assertEquals(Some(secondOffset), group.offset(partition))
   }
 
+  @Test
+  def testConsumerBeatsTransactionalOffsetCommit(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val producerId = 13232L
+    val txnOffsetCommit = OffsetAndMetadata(37)
+    val consumerOffsetCommit = OffsetAndMetadata(57)
+
+    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    assertTrue(group.hasOffsets)
+
+    group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(3L), txnOffsetCommit))
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(4L), consumerOffsetCommit))
+    assertTrue(group.hasOffsets)
+    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)
+    assertTrue(group.hasOffsets)
+    // This is the crucial assertion which validates that we materialize offsets in offset order, not transactional order.
+    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+  }
+
+  @Test
+  def testTransactionBeatsConsumerOffsetCommit(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val producerId = 13232L
+    val txnOffsetCommit = OffsetAndMetadata(37)
+    val consumerOffsetCommit = OffsetAndMetadata(57)
+
+    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    assertTrue(group.hasOffsets)
+
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+    group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+    assertTrue(group.hasOffsets)
+    // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit.
+    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)
+    assertTrue(group.hasOffsets)
+    // The transactional offset commit has been materialized and the transactional commit record is later in the log,
+    // so it should be materialized.
+    assertEquals(Some(txnOffsetCommit), group.offset(partition))
+  }
+
+  @Test
+  def testTransactionalCommitIsAbortedAndConsumerCommitWins(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val producerId = 13232L
+    val txnOffsetCommit = OffsetAndMetadata(37)
+    val consumerOffsetCommit = OffsetAndMetadata(57)
+
+    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+
+    group.prepareOffsetCommit(Map(partition -> consumerOffsetCommit))
+    assertTrue(group.hasOffsets)
+
+    group.onOffsetCommitAppend(partition, CommitRecordMetadataAndOffset(Some(3L), consumerOffsetCommit))
+    group.onTxnOffsetCommitAppend(producerId, partition, CommitRecordMetadataAndOffset(Some(4L), txnOffsetCommit))
+    assertTrue(group.hasOffsets)
+    // The transactional offset commit hasn't been committed yet, so we should materialize the consumer offset commit.
+    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+
+    group.completePendingTxnOffsetCommit(producerId, isCommit = false)
+    assertTrue(group.hasOffsets)
+    // The transactional offset commit should be discarded and the consumer offset commit should continue to be
+    // materialized.
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+    assertEquals(Some(consumerOffsetCommit), group.offset(partition))
+  }
+
+  @Test
+  def testFailedTxnOffsetCommitLeavesNoPendingState(): Unit = {
+    val partition = new TopicPartition("foo", 0)
+    val producerId = 13232L
+    val txnOffsetCommit = OffsetAndMetadata(37)
+
+    group.prepareTxnOffsetCommit(producerId, Map(partition -> txnOffsetCommit))
+    assertTrue(group.hasPendingOffsetCommitsFromProducer(producerId))
+    assertTrue(group.hasOffsets)
+    assertEquals(None, group.offset(partition))
+    group.failPendingTxnOffsetCommit(producerId, partition, txnOffsetCommit)
+    assertFalse(group.hasOffsets)
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+
+    // The commit marker should now have no effect.
+    group.completePendingTxnOffsetCommit(producerId, isCommit = true)
+    assertFalse(group.hasOffsets)
+    assertFalse(group.hasPendingOffsetCommitsFromProducer(producerId))
+  }
+
   private def assertState(group: GroupMetadata, targetState: GroupState) {
     val states: Set[GroupState] = Set(Stable, PreparingRebalance, AwaitingSync, Dead)
     val otherStates = states - targetState