You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by da...@apache.org on 2024/01/23 07:26:51 UTC

(kafka) branch trunk updated: KAFKA-14505; [7/N] Always materialize the most recent committed offset (#15183)

This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4d6a422e860 KAFKA-14505; [7/N] Always materialize the most recent committed offset (#15183)
4d6a422e860 is described below

commit 4d6a422e8607c257e58b6956061732dc33621fc2
Author: David Jacot <dj...@confluent.io>
AuthorDate: Mon Jan 22 23:26:40 2024 -0800

    KAFKA-14505; [7/N] Always materialize the most recent committed offset (#15183)
    
    When transactional offset commits are eventually committed, we must always keep the most recent committed when we have a mix of transactional and regular offset commits. We achieve this by storing the offset of the offset commit record along side the committed offset in memory. Without preserving information of the commit record offset, compaction of the __consumer_offsets topic itself may result in the wrong offset commit being materialized.
    
    Reviewers: Jeff Kim <je...@confluent.io>, Justine Olshan <jo...@confluent.io>
---
 .../coordinator/group/CoordinatorLoaderImpl.scala  |  1 +
 .../group/CoordinatorLoaderImplTest.scala          | 44 ++++++-------
 .../coordinator/group/GroupCoordinatorShard.java   |  3 +
 .../kafka/coordinator/group/OffsetAndMetadata.java | 53 +++++++++++----
 .../coordinator/group/OffsetMetadataManager.java   | 45 +++++++++----
 .../kafka/coordinator/group/RecordHelpers.java     |  2 +-
 .../group/runtime/CoordinatorPlayback.java         |  2 +
 .../group/runtime/CoordinatorRuntime.java          | 12 ++--
 .../group/runtime/CoordinatorShard.java            |  2 +
 .../group/runtime/SnapshottableCoordinator.java    |  4 +-
 .../group/GroupCoordinatorShardTest.java           | 65 +++++++++++-------
 .../coordinator/group/OffsetAndMetadataTest.java   |  8 ++-
 .../group/OffsetMetadataManagerTest.java           | 76 ++++++++++++++++++++++
 .../group/runtime/CoordinatorRuntimeTest.java      |  4 ++
 14 files changed, 241 insertions(+), 80 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
index a02a0e018cc..ed8c32362c3 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorLoaderImpl.scala
@@ -158,6 +158,7 @@ class CoordinatorLoaderImpl[T](
                   numRecords = numRecords + 1
                   try {
                     coordinator.replay(
+                      record.offset(),
                       batch.producerId,
                       batch.producerEpoch,
                       deserializer.deserialize(record.key, record.value)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
index c583455817a..5f91c759c9d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorLoaderImplTest.scala
@@ -176,13 +176,13 @@ class CoordinatorLoaderImplTest {
 
       assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
 
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
-      verify(coordinator).replay(100L, 5.toShort, ("k6", "v6"))
-      verify(coordinator).replay(100L, 5.toShort, ("k7", "v7"))
+      verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(5L, 100L, 5.toShort, ("k6", "v6"))
+      verify(coordinator).replay(6L, 100L, 5.toShort, ("k7", "v7"))
       verify(coordinator).replayEndTransactionMarker(100L, 5, TransactionResult.COMMIT)
       verify(coordinator).replayEndTransactionMarker(500L, 10, TransactionResult.ABORT)
       verify(coordinator).updateLastWrittenOffset(2)
@@ -272,7 +272,7 @@ class CoordinatorLoaderImplTest {
 
       loader.load(tp, coordinator).get(10, TimeUnit.SECONDS)
 
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
     }
   }
 
@@ -462,13 +462,13 @@ class CoordinatorLoaderImplTest {
 
       assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
 
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+      verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+      verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
       verify(coordinator, times(0)).updateLastWrittenOffset(0)
       verify(coordinator, times(1)).updateLastWrittenOffset(2)
       verify(coordinator, times(1)).updateLastWrittenOffset(5)
@@ -563,13 +563,13 @@ class CoordinatorLoaderImplTest {
 
       assertNotNull(loader.load(tp, coordinator).get(10, TimeUnit.SECONDS))
 
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
-      verify(coordinator).replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
+      verify(coordinator).replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k1", "v1"))
+      verify(coordinator).replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k2", "v2"))
+      verify(coordinator).replay(2L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k3", "v3"))
+      verify(coordinator).replay(3L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k4", "v4"))
+      verify(coordinator).replay(4L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k5", "v5"))
+      verify(coordinator).replay(5L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k6", "v6"))
+      verify(coordinator).replay(6L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, ("k7", "v7"))
       verify(coordinator, times(0)).updateLastWrittenOffset(0)
       verify(coordinator, times(0)).updateLastWrittenOffset(2)
       verify(coordinator, times(0)).updateLastWrittenOffset(5)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
index 9a7360af6a7..7671c23f306 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorShard.java
@@ -639,6 +639,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
     /**
      * Replays the Record to update the hard state of the group coordinator.
      *
+     * @param offset        The offset of the record in the log.
      * @param producerId    The producer id.
      * @param producerEpoch The producer epoch.
      * @param record        The record to apply to the state machine.
@@ -646,6 +647,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
      */
     @Override
     public void replay(
+        long offset,
         long producerId,
         short producerEpoch,
         Record record
@@ -657,6 +659,7 @@ public class GroupCoordinatorShard implements CoordinatorShard<Record> {
             case 0:
             case 1:
                 offsetMetadataManager.replay(
+                    offset,
                     producerId,
                     (OffsetCommitKey) key.message(),
                     (OffsetCommitValue) messageOrNull(value)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
index 0eb1afaead5..d2c91694f6b 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetAndMetadata.java
@@ -36,7 +36,7 @@ public class OffsetAndMetadata {
     /**
      * The committed offset.
      */
-    public final long offset;
+    public final long committedOffset;
 
     /**
      * The leader epoch in use when the offset was committed.
@@ -61,14 +61,38 @@ public class OffsetAndMetadata {
      */
     public final OptionalLong expireTimestampMs;
 
+    /**
+     * The offset of the commit record in the log.
+     */
+    public final long recordOffset;
+
+    public OffsetAndMetadata(
+        long committedOffset,
+        OptionalInt leaderEpoch,
+        String metadata,
+        long commitTimestampMs,
+        OptionalLong expireTimestampMs
+    ) {
+        this(
+            -1L,
+            committedOffset,
+            leaderEpoch,
+            metadata,
+            commitTimestampMs,
+            expireTimestampMs
+        );
+    }
+
     public OffsetAndMetadata(
-        long offset,
+        long recordOffset,
+        long committedOffset,
         OptionalInt leaderEpoch,
         String metadata,
         long commitTimestampMs,
         OptionalLong expireTimestampMs
     ) {
-        this.offset = offset;
+        this.recordOffset = recordOffset;
+        this.committedOffset = committedOffset;
         this.leaderEpoch = Objects.requireNonNull(leaderEpoch);
         this.metadata = Objects.requireNonNull(metadata);
         this.commitTimestampMs = commitTimestampMs;
@@ -77,11 +101,12 @@ public class OffsetAndMetadata {
 
     @Override
     public String toString() {
-        return "OffsetAndMetadata(offset=" + offset +
+        return "OffsetAndMetadata(offset=" + committedOffset +
             ", leaderEpoch=" + leaderEpoch +
             ", metadata=" + metadata +
             ", commitTimestampMs=" + commitTimestampMs +
             ", expireTimestampMs=" + expireTimestampMs +
+            ", recordOffset=" + recordOffset +
             ')';
     }
 
@@ -92,20 +117,22 @@ public class OffsetAndMetadata {
 
         OffsetAndMetadata that = (OffsetAndMetadata) o;
 
-        if (offset != that.offset) return false;
+        if (committedOffset != that.committedOffset) return false;
         if (commitTimestampMs != that.commitTimestampMs) return false;
-        if (!leaderEpoch.equals(that.leaderEpoch)) return false;
-        if (!metadata.equals(that.metadata)) return false;
-        return expireTimestampMs.equals(that.expireTimestampMs);
+        if (recordOffset != that.recordOffset) return false;
+        if (!Objects.equals(leaderEpoch, that.leaderEpoch)) return false;
+        if (!Objects.equals(metadata, that.metadata)) return false;
+        return Objects.equals(expireTimestampMs, that.expireTimestampMs);
     }
 
     @Override
     public int hashCode() {
-        int result = (int) (offset ^ (offset >>> 32));
-        result = 31 * result + leaderEpoch.hashCode();
-        result = 31 * result + metadata.hashCode();
+        int result = (int) (committedOffset ^ (committedOffset >>> 32));
+        result = 31 * result + (leaderEpoch != null ? leaderEpoch.hashCode() : 0);
+        result = 31 * result + (metadata != null ? metadata.hashCode() : 0);
         result = 31 * result + (int) (commitTimestampMs ^ (commitTimestampMs >>> 32));
-        result = 31 * result + expireTimestampMs.hashCode();
+        result = 31 * result + (expireTimestampMs != null ? expireTimestampMs.hashCode() : 0);
+        result = 31 * result + (int) (recordOffset ^ (recordOffset >>> 32));
         return result;
     }
 
@@ -113,9 +140,11 @@ public class OffsetAndMetadata {
      * @return An OffsetAndMetadata created from a OffsetCommitValue record.
      */
     public static OffsetAndMetadata fromRecord(
+        long recordOffset,
         OffsetCommitValue record
     ) {
         return new OffsetAndMetadata(
+            recordOffset,
             record.offset(),
             ofSentinel(record.leaderEpoch()),
             record.metadata(),
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 1b02402ce96..855d46a68c4 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -738,7 +738,7 @@ public class OffsetMetadataManager {
                 } else {
                     topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                         .setPartitionIndex(partitionIndex)
-                        .setCommittedOffset(offsetAndMetadata.offset)
+                        .setCommittedOffset(offsetAndMetadata.committedOffset)
                         .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
                         .setMetadata(offsetAndMetadata.metadata));
                 }
@@ -799,7 +799,7 @@ public class OffsetMetadataManager {
                     } else {
                         topicResponse.partitions().add(new OffsetFetchResponseData.OffsetFetchResponsePartitions()
                             .setPartitionIndex(partition)
-                            .setCommittedOffset(offsetAndMetadata.offset)
+                            .setCommittedOffset(offsetAndMetadata.committedOffset)
                             .setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(-1))
                             .setMetadata(offsetAndMetadata.metadata));
                     }
@@ -884,12 +884,14 @@ public class OffsetMetadataManager {
     /**
      * Replays OffsetCommitKey/Value to update or delete the corresponding offsets.
      *
-     * @param producerId The producer id of the batch containing the provided
-     *                   key and value.
-     * @param key        A OffsetCommitKey key.
-     * @param value      A OffsetCommitValue value.
+     * @param recordOffset  The offset of the record in the log.
+     * @param producerId    The producer id of the batch containing the provided
+     *                      key and value.
+     * @param key           A OffsetCommitKey key.
+     * @param value         A OffsetCommitValue value.
      */
     public void replay(
+        long recordOffset,
         long producerId,
         OffsetCommitKey key,
         OffsetCommitValue value
@@ -918,7 +920,7 @@ public class OffsetMetadataManager {
                     groupId,
                     topic,
                     partition,
-                    OffsetAndMetadata.fromRecord(value)
+                    OffsetAndMetadata.fromRecord(recordOffset, value)
                 );
                 if (previousValue == null) {
                     metrics.incrementNumOffsets();
@@ -934,7 +936,7 @@ public class OffsetMetadataManager {
                         groupId,
                         topic,
                         partition,
-                        OffsetAndMetadata.fromRecord(value)
+                        OffsetAndMetadata.fromRecord(recordOffset, value)
                     );
                 openTransactionsByGroup
                     .computeIfAbsent(groupId, __ -> new TimelineHashSet<>(snapshotRegistry, 1))
@@ -979,15 +981,30 @@ public class OffsetMetadataManager {
             pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
                 topicOffsets.forEach((topicName, partitionOffsets) -> {
                     partitionOffsets.forEach((partitionId, offsetAndMetadata) -> {
-                        log.debug("Committed transaction offset commit for producer id {} in group {} " +
-                            "with topic {}, partition {}, and offset {}.",
-                            producerId, groupId, topicName, partitionId, offsetAndMetadata);
-                        offsets.put(
+                        OffsetAndMetadata existingOffsetAndMetadata = offsets.get(
                             groupId,
                             topicName,
-                            partitionId,
-                            offsetAndMetadata
+                            partitionId
                         );
+
+                        // We always keep the most recent committed offset when we have a mix of transactional and regular
+                        // offset commits. Without preserving information of the commit record offset, compaction of the
+                        // __consumer_offsets topic itself may result in the wrong offset commit being materialized.
+                        if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+                            log.debug("Committed transactional offset commit {} for producer id {} in group {} " +
+                                "with topic {} and partition {}.",
+                                offsetAndMetadata, producerId, groupId, topicName, partitionId);
+                            offsets.put(
+                                groupId,
+                                topicName,
+                                partitionId,
+                                offsetAndMetadata
+                            );
+                        } else {
+                            log.info("Skipped the materialization of transactional offset commit {} for producer id {} in group {} with topic {}, " +
+                                "partition {} since its record offset {} is smaller than the record offset {} of the last committed offset.",
+                                offsetAndMetadata, producerId, groupId, topicName, partitionId, offsetAndMetadata.recordOffset, existingOffsetAndMetadata.recordOffset);
+                        }
                     });
                 });
             });
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
index 5e7e6fa3f44..47ba36c84a3 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java
@@ -513,7 +513,7 @@ public class RecordHelpers {
             ),
             new ApiMessageAndVersion(
                 new OffsetCommitValue()
-                    .setOffset(offsetAndMetadata.offset)
+                    .setOffset(offsetAndMetadata.committedOffset)
                     .setLeaderEpoch(offsetAndMetadata.leaderEpoch.orElse(RecordBatch.NO_PARTITION_LEADER_EPOCH))
                     .setMetadata(offsetAndMetadata.metadata)
                     .setCommitTimestamp(offsetAndMetadata.commitTimestampMs)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
index 9aa5400cc05..ef811f4d664 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorPlayback.java
@@ -30,12 +30,14 @@ public interface CoordinatorPlayback<U> {
     /**
      * Applies the given record to this object.
      *
+     * @param offset        The offset of the record in the log.
      * @param producerId    The producer id.
      * @param producerEpoch The producer epoch.
      * @param record        A record.
      * @throws RuntimeException if the record can not be applied.
      */
     void replay(
+        long offset,
         long producerId,
         short producerEpoch,
         U record
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index f40c99c7ad4..5674273b12d 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -713,13 +713,17 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut
                         try {
                             // Apply the records to the state machine.
                             if (result.replayRecords()) {
-                                result.records().forEach(record ->
+                                // We compute the offset of the record based on the last written offset. The
+                                // coordinator is the single writer to the underlying partition so we can
+                                // deduce it like this.
+                                for (int i = 0; i < result.records().size(); i++) {
                                     context.coordinator.replay(
+                                        prevLastWrittenOffset + i,
                                         producerId,
                                         producerEpoch,
-                                        record
-                                    )
-                                );
+                                        result.records().get(i)
+                                    );
+                                }
                             }
 
                             // Write the records to the log and update the last written
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
index 391acff144f..5581f333873 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorShard.java
@@ -52,11 +52,13 @@ public interface CoordinatorShard<U> {
     /**
      * Replay a record to update the state machine.
      *
+     * @param offset        The offset of the record in the log.
      * @param producerId    The producer id.
      * @param producerEpoch The producer epoch.
      * @param record        The record to replay.
      */
     void replay(
+        long offset,
         long producerId,
         short producerEpoch,
         U record
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
index c5c6995c37d..cf428402897 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/SnapshottableCoordinator.java
@@ -99,17 +99,19 @@ class SnapshottableCoordinator<S extends CoordinatorShard<U>, U> implements Coor
     /**
      * Replays the record onto the state machine.
      *
+     * @param offset        The offset of the record in the log.
      * @param producerId    The producer id.
      * @param producerEpoch The producer epoch.
      * @param record        A record.
      */
     @Override
     public synchronized void replay(
+        long offset,
         long producerId,
         short producerEpoch,
         U record
     ) {
-        coordinator.replay(producerId, producerEpoch, record);
+        coordinator.replay(offset, producerId, producerEpoch, record);
     }
 
     /**
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
index 740ad2413c1..4637517cba2 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorShardTest.java
@@ -325,17 +325,25 @@ public class GroupCoordinatorShardTest {
         OffsetCommitKey key = new OffsetCommitKey();
         OffsetCommitValue value = new OffsetCommitValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 0),
             new ApiMessageAndVersion(value, (short) 0)
         ));
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 1),
             new ApiMessageAndVersion(value, (short) 0)
         ));
 
-        verify(offsetMetadataManager, times(2)).replay(
+        verify(offsetMetadataManager, times(1)).replay(
+            0L,
+            RecordBatch.NO_PRODUCER_ID,
+            key,
+            value
+        );
+
+        verify(offsetMetadataManager, times(1)).replay(
+            1L,
             RecordBatch.NO_PRODUCER_ID,
             key,
             value
@@ -362,23 +370,25 @@ public class GroupCoordinatorShardTest {
         OffsetCommitKey key = new OffsetCommitKey();
         OffsetCommitValue value = new OffsetCommitValue();
 
-        coordinator.replay(100L, (short) 0, new Record(
+        coordinator.replay(0L, 100L, (short) 0, new Record(
             new ApiMessageAndVersion(key, (short) 0),
             new ApiMessageAndVersion(value, (short) 0)
         ));
 
-        coordinator.replay(101L, (short) 1, new Record(
+        coordinator.replay(1L, 101L, (short) 1, new Record(
             new ApiMessageAndVersion(key, (short) 1),
             new ApiMessageAndVersion(value, (short) 0)
         ));
 
         verify(offsetMetadataManager, times(1)).replay(
+            0L,
             100L,
             key,
             value
         );
 
         verify(offsetMetadataManager, times(1)).replay(
+            1L,
             101L,
             key,
             value
@@ -404,17 +414,25 @@ public class GroupCoordinatorShardTest {
 
         OffsetCommitKey key = new OffsetCommitKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 0),
             null
         ));
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(1L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 1),
             null
         ));
 
-        verify(offsetMetadataManager, times(2)).replay(
+        verify(offsetMetadataManager, times(1)).replay(
+            0L,
+            RecordBatch.NO_PRODUCER_ID,
+            key,
+            null
+        );
+
+        verify(offsetMetadataManager, times(1)).replay(
+            1L,
             RecordBatch.NO_PRODUCER_ID,
             key,
             null
@@ -441,7 +459,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
         ConsumerGroupMetadataValue value = new ConsumerGroupMetadataValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 3),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -468,7 +486,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupMetadataKey key = new ConsumerGroupMetadataKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 3),
             null
         ));
@@ -496,7 +514,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
         ConsumerGroupPartitionMetadataValue value = new ConsumerGroupPartitionMetadataValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 4),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -523,7 +541,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupPartitionMetadataKey key = new ConsumerGroupPartitionMetadataKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 4),
             null
         ));
@@ -551,7 +569,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
         ConsumerGroupMemberMetadataValue value = new ConsumerGroupMemberMetadataValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 5),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -578,7 +596,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupMemberMetadataKey key = new ConsumerGroupMemberMetadataKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 5),
             null
         ));
@@ -606,7 +624,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
         ConsumerGroupTargetAssignmentMetadataValue value = new ConsumerGroupTargetAssignmentMetadataValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 6),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -633,7 +651,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupTargetAssignmentMetadataKey key = new ConsumerGroupTargetAssignmentMetadataKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 6),
             null
         ));
@@ -661,7 +679,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
         ConsumerGroupTargetAssignmentMemberValue value = new ConsumerGroupTargetAssignmentMemberValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 7),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -688,7 +706,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupTargetAssignmentMemberKey key = new ConsumerGroupTargetAssignmentMemberKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 7),
             null
         ));
@@ -716,7 +734,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
         ConsumerGroupCurrentMemberAssignmentValue value = new ConsumerGroupCurrentMemberAssignmentValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 8),
             new ApiMessageAndVersion(value, (short) 0)
         ));
@@ -743,7 +761,7 @@ public class GroupCoordinatorShardTest {
 
         ConsumerGroupCurrentMemberAssignmentKey key = new ConsumerGroupCurrentMemberAssignmentKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 8),
             null
         ));
@@ -770,6 +788,7 @@ public class GroupCoordinatorShardTest {
 
         assertThrows(NullPointerException.class, () ->
             coordinator.replay(
+                0L,
                 RecordBatch.NO_PRODUCER_ID,
                 RecordBatch.NO_PRODUCER_EPOCH,
                 new Record(null, null))
@@ -797,7 +816,7 @@ public class GroupCoordinatorShardTest {
         ConsumerGroupCurrentMemberAssignmentValue value = new ConsumerGroupCurrentMemberAssignmentValue();
 
         assertThrows(IllegalStateException.class, () ->
-            coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+            coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
                 new ApiMessageAndVersion(key, (short) 255),
                 new ApiMessageAndVersion(value, (short) 0)
             ))
@@ -852,7 +871,7 @@ public class GroupCoordinatorShardTest {
         GroupMetadataKey key = new GroupMetadataKey();
         GroupMetadataValue value = new GroupMetadataValue();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 2),
             new ApiMessageAndVersion(value, (short) 4)
         ));
@@ -879,7 +898,7 @@ public class GroupCoordinatorShardTest {
 
         GroupMetadataKey key = new GroupMetadataKey();
 
-        coordinator.replay(RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
+        coordinator.replay(0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, new Record(
             new ApiMessageAndVersion(key, (short) 2),
             null
         ));
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
index 46fe369974b..5ff3a243cbe 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetAndMetadataTest.java
@@ -38,7 +38,7 @@ public class OffsetAndMetadataTest {
             OptionalLong.of(5678L)
         );
 
-        assertEquals(100L, offsetAndMetadata.offset);
+        assertEquals(100L, offsetAndMetadata.committedOffset);
         assertEquals(OptionalInt.of(10), offsetAndMetadata.leaderEpoch);
         assertEquals("metadata", offsetAndMetadata.metadata);
         assertEquals(1234L, offsetAndMetadata.commitTimestampMs);
@@ -55,24 +55,26 @@ public class OffsetAndMetadataTest {
             .setExpireTimestamp(-1L);
 
         assertEquals(new OffsetAndMetadata(
+            10L,
             100L,
             OptionalInt.empty(),
             "metadata",
             1234L,
             OptionalLong.empty()
-        ), OffsetAndMetadata.fromRecord(record));
+        ), OffsetAndMetadata.fromRecord(10L, record));
 
         record
             .setLeaderEpoch(12)
             .setExpireTimestamp(5678L);
 
         assertEquals(new OffsetAndMetadata(
+            11L,
             100L,
             OptionalInt.of(12),
             "metadata",
             1234L,
             OptionalLong.of(5678L)
-        ), OffsetAndMetadata.fromRecord(record));
+        ), OffsetAndMetadata.fromRecord(11L, record));
     }
 
     @Test
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index c0b2b7e2a68..afd8e2eefb1 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -461,6 +461,7 @@ public class OffsetMetadataManagerTest {
             switch (key.version()) {
                 case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION:
                     offsetMetadataManager.replay(
+                        lastWrittenOffset,
                         producerId,
                         (OffsetCommitKey) key.message(),
                         (OffsetCommitValue) messageOrNull(value)
@@ -2515,6 +2516,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
 
         verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
+            0L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2523,6 +2525,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
+            1L,
             200L,
             OptionalInt.of(10),
             "small",
@@ -2531,6 +2534,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
+            2L,
             200L,
             OptionalInt.of(10),
             "small",
@@ -2539,6 +2543,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(
+            3L,
             300L,
             OptionalInt.of(10),
             "small",
@@ -2552,6 +2557,7 @@ public class OffsetMetadataManagerTest {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
 
         verifyTransactionalReplay(context, 5, "foo", "bar", 0, new OffsetAndMetadata(
+            0L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2560,6 +2566,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyTransactionalReplay(context, 5, "foo", "bar", 1, new OffsetAndMetadata(
+            1L,
             101L,
             OptionalInt.empty(),
             "small",
@@ -2568,6 +2575,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyTransactionalReplay(context, 5, "bar", "zar", 0, new OffsetAndMetadata(
+            2L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2576,6 +2584,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyTransactionalReplay(context, 5, "bar", "zar", 1, new OffsetAndMetadata(
+            3L,
             101L,
             OptionalInt.empty(),
             "small",
@@ -2584,6 +2593,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyTransactionalReplay(context, 6, "foo", "bar", 2, new OffsetAndMetadata(
+            4L,
             102L,
             OptionalInt.empty(),
             "small",
@@ -2592,6 +2602,7 @@ public class OffsetMetadataManagerTest {
         ));
 
         verifyTransactionalReplay(context, 6, "foo", "bar", 3, new OffsetAndMetadata(
+            5L,
             102L,
             OptionalInt.empty(),
             "small",
@@ -2606,6 +2617,7 @@ public class OffsetMetadataManagerTest {
 
         // Verify replay adds the offset the map.
         verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
+            0L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2628,8 +2640,19 @@ public class OffsetMetadataManagerTest {
     public void testReplayTransactionEndMarkerWithCommit() {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
 
+        // Add regular offset commit.
+        verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
+            0L,
+            99L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
         // Add pending transactional commit for producer id 5.
         verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(
+            1L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2639,6 +2662,7 @@ public class OffsetMetadataManagerTest {
 
         // Add pending transactional commit for producer id 6.
         verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata(
+            2L,
             200L,
             OptionalInt.empty(),
             "small",
@@ -2662,6 +2686,7 @@ public class OffsetMetadataManagerTest {
 
         // ... and added to the main offset storage.
         assertEquals(new OffsetAndMetadata(
+            1L,
             100L,
             OptionalInt.empty(),
             "small",
@@ -2691,6 +2716,57 @@ public class OffsetMetadataManagerTest {
         ));
     }
 
+    @Test
+    public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() {
+        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
+
+        // Add pending transactional offset commit for producer id 5.
+        verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(
+            0L,
+            100L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Add regular offset commit.
+        verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(
+            1L,
+            101L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ));
+
+        // Replaying an end marker to commit transaction of producer id 5.
+        context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
+
+        // The pending offset is removed...
+        assertNull(context.offsetMetadataManager.pendingTransactionalOffset(
+            5L,
+            "foo",
+            "bar",
+            0
+        ));
+
+        // ... but it is not added to the main storage because the regular
+        // committed offset is more recent.
+        assertEquals(new OffsetAndMetadata(
+            1L,
+            101L,
+            OptionalInt.empty(),
+            "small",
+            context.time.milliseconds(),
+            OptionalLong.empty()
+        ), context.offsetMetadataManager.offset(
+            "foo",
+            "bar",
+            0
+        ));
+    }
+
     @Test
     public void testOffsetCommitsSensor() {
         OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index 0a5c1b5f537..ba1a340e8e1 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -274,6 +274,7 @@ public class CoordinatorRuntimeTest {
 
         @Override
         public void replay(
+            long offset,
             long producerId,
             short producerEpoch,
             String record
@@ -1034,6 +1035,7 @@ public class CoordinatorRuntimeTest {
             new MockCoordinatorShard(snapshotRegistry, ctx.timer) {
                 @Override
                 public void replay(
+                    long offset,
                     long producerId,
                     short producerEpoch,
                     String record
@@ -1213,11 +1215,13 @@ public class CoordinatorRuntimeTest {
         // Verify that the coordinator got the records with the correct
         // producer id and producer epoch.
         verify(coordinator, times(1)).replay(
+            eq(0L),
             eq(100L),
             eq((short) 50),
             eq("record1")
         );
         verify(coordinator, times(1)).replay(
+            eq(1L),
             eq(100L),
             eq((short) 50),
             eq("record2")