You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/05/01 16:42:40 UTC

[kafka] 03/03: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 402cce07961dbbe1ac68f2e28497d28e9d4e5edf
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Apr 26 00:30:13 2023 -0700

    KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)
    
    Reviewers: Daniel Urban <du...@cloudera.com>, Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  57 ++++--
 .../kafka/connect/mirror/OffsetSyncStore.java      | 192 ++++++++++++++++++++-
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  70 ++++++--
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  | 156 +++++++++++++++--
 .../MirrorConnectorsIntegrationBaseTest.java       |  88 +++++++++-
 5 files changed, 514 insertions(+), 49 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 72248fd9f6a..150a55abf51 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -42,6 +42,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.Collections;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.concurrent.ExecutionException;
 import java.time.Duration;
@@ -67,20 +68,21 @@ public class MirrorCheckpointTask extends SourceTask {
     private MirrorCheckpointMetrics metrics;
     private Scheduler scheduler;
     private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
-    private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
+    private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
     public MirrorCheckpointTask() {}
 
     // for testing
     MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
             ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
             Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
-            Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
+            Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
         this.sourceClusterAlias = sourceClusterAlias;
         this.targetClusterAlias = targetClusterAlias;
         this.replicationPolicy = replicationPolicy;
         this.offsetSyncStore = offsetSyncStore;
         this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
         this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+        this.topicFilter = topic -> true;
     }
 
     @Override
@@ -165,9 +167,11 @@ public class MirrorCheckpointTask extends SourceTask {
     private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
         try {
             long timestamp = System.currentTimeMillis();
-            List<Checkpoint> checkpoints = checkpointsForGroup(group);
-            checkpointsPerConsumerGroup.put(group, checkpoints);
-            return checkpoints.stream()
+            Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
+            Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
+            Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
+            oldCheckpoints.putAll(newCheckpoints);
+            return newCheckpoints.values().stream()
                 .map(x -> checkpointRecord(x, timestamp))
                 .collect(Collectors.toList());
         } catch (ExecutionException e) {
@@ -176,13 +180,44 @@ public class MirrorCheckpointTask extends SourceTask {
         }
     }
 
-    private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
-        return listConsumerGroupOffsets(group).entrySet().stream()
+    // for testing
+    Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets, String group) {
+        return upstreamGroupOffsets.entrySet().stream()
             .filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
             .map(x -> checkpoint(group, x.getKey(), x.getValue()))
             .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
             .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately
-            .collect(Collectors.toList());
+            .filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
+            .collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
+    }
+
+    private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
+        Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
+        if (checkpoints == null) {
+            log.trace("Emitting {} (first for this group)", checkpoint);
+            return true;
+        }
+        Checkpoint lastCheckpoint = checkpoints.get(checkpoint.topicPartition());
+        if (lastCheckpoint == null) {
+            log.trace("Emitting {} (first for this partition)", checkpoint);
+            return true;
+        }
+        // Emit sync after a rewind of the upstream consumer group takes place (checkpoints can be non-monotonic)
+        if (checkpoint.upstreamOffset() < lastCheckpoint.upstreamOffset()) {
+            log.trace("Emitting {} (upstream offset rewind)", checkpoint);
+            return true;
+        }
+        // Or if the downstream offset is newer (force checkpoints to be monotonic)
+        if (checkpoint.downstreamOffset() > lastCheckpoint.downstreamOffset()) {
+            log.trace("Emitting {} (downstream offset advanced)", checkpoint);
+            return true;
+        }
+        if (checkpoint.downstreamOffset() != lastCheckpoint.downstreamOffset()) {
+            log.trace("Skipping {} (preventing downstream rewind)", checkpoint);
+        } else {
+            log.trace("Skipping {} (repeated checkpoint)", checkpoint);
+        }
+        return false;
     }
 
     private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
@@ -199,7 +234,7 @@ public class MirrorCheckpointTask extends SourceTask {
         if (offsetAndMetadata != null) {
             long upstreamOffset = offsetAndMetadata.offset();
             OptionalLong downstreamOffset =
-                offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+                offsetSyncStore.translateDownstream(group, topicPartition, upstreamOffset);
             if (downstreamOffset.isPresent()) {
                 return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
                     upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
@@ -334,10 +369,10 @@ public class MirrorCheckpointTask extends SourceTask {
     Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
         Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();
 
-        for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
+        for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
             String consumerId = entry.getKey();
             Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
-            for (Checkpoint checkpoint : entry.getValue()) {
+            for (Checkpoint checkpoint : entry.getValue().values()) {
                 convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
             }
             result.put(consumerId, convertedUpstreamOffset);
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 0169446aa04..9222d314a43 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -25,17 +25,46 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
+ *     <li>Invariant D: syncs[63] is the earliest offset sync from the syncs topic usable for translation</li>
+ * </ul>
+ * <p>The above invariants ensure that the store is kept updated upon receipt of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows updates to the store in linear time.
+ * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset syncs for the topic were potentially used as
+ * for translation in an earlier generation of the sync store.
+ */
 class OffsetSyncStore implements AutoCloseable {
+
+    private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class);
+    // Store one offset sync for each bit of the topic offset.
+    // Visible for testing
+    static final int SYNCS_PER_PARTITION = Long.SIZE;
     private final KafkaBasedLog<byte[], byte[]> backingStore;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<>();
     private final TopicAdmin admin;
     protected volatile boolean readToEnd = false;
 
@@ -99,16 +128,21 @@ class OffsetSyncStore implements AutoCloseable {
         readToEnd = true;
     }
 
-    OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+    OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
         if (!readToEnd) {
             // If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
             // This prevents emitting stale offsets while initially reading the offset syncs topic.
+            log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)",
+                    group, sourceTopicPartition, upstreamOffset);
             return OptionalLong.empty();
         }
-        Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
+        Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition, upstreamOffset);
         if (offsetSync.isPresent()) {
             if (offsetSync.get().upstreamOffset() > upstreamOffset) {
                 // Offset is too far in the past to translate accurately
+                log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})",
+                        group, sourceTopicPartition, upstreamOffset,
+                        offsetSync.get(), upstreamOffset);
                 return OptionalLong.of(-1L);
             }
             // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
@@ -124,8 +158,15 @@ class OffsetSyncStore implements AutoCloseable {
             //          vv
             // target |-sg----r-----|
             long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
+            log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
+                    group, sourceTopicPartition, upstreamOffset,
+                    offsetSync.get().downstreamOffset() + upstreamStep,
+                    offsetSync.get()
+            );
             return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
         } else {
+            log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
+                    group, sourceTopicPartition, upstreamOffset);
             return OptionalLong.empty();
         }
     }
@@ -139,10 +180,147 @@ class OffsetSyncStore implements AutoCloseable {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // While reading to the end of the topic, ensure that our earliest sync is later than
+        // any earlier sync that could have been used for translation, to preserve monotonicity
+        // If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        // The most-recently-discarded offset sync
+        // We track this since it may still be eligible for use in the syncs array at a later index
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            // This replacement may or may not be used, since the invariants could already be satisfied,
+            // and in that case, prefer to keep the existing tail of the syncs array rather than updating it.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, current);
+                assert invariantC(syncs[previous], syncs[current], previous);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
+        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));
+        return iSync == jSync || (bound >= 0 && iSync.upstreamOffset() >= bound);
+    }
+
+    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long upstreamOffset) {
+        return Optional.ofNullable(offsetSyncs.get(topicPartition))
+                .map(syncs -> lookupLatestSync(syncs, upstreamOffset));
+    }
+
+
+    private OffsetSync lookupLatestSync(OffsetSync[] syncs, long upstreamOffset) {
+        // linear search the syncs, effectively a binary search over the topic offsets
+        // Search from latest to earliest to find the sync that gives the best accuracy
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            OffsetSync offsetSync = syncs[i];
+            if (offsetSync.upstreamOffset() <= upstreamOffset) {
+                return offsetSync;
+            }
+        }
+        return syncs[SYNCS_PER_PARTITION - 1];
     }
 
-    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) {
-        return Optional.ofNullable(offsetSyncs.get(topicPartition));
+    // For testing
+    OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
+        OffsetSync[] syncs = offsetSyncs.get(topicPartition);
+        if (syncs == null)
+            throw new IllegalArgumentException("No syncs present for " + topicPartition);
+        if (syncIdx >= syncs.length)
+            throw new IllegalArgumentException(
+                    "Requested sync " + (syncIdx + 1) + " for " + topicPartition
+                            + " but there are only " + syncs.length + " syncs available for that topic partition"
+            );
+        return syncs[syncIdx];
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 500cb6c131a..f9bc7bc76cb 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -16,12 +16,11 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -31,6 +30,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MirrorCheckpointTaskTest {
@@ -118,7 +118,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testSyncOffset() {
         Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
-        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+        Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
 
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
@@ -147,16 +147,16 @@ public class MirrorCheckpointTaskTest {
         // 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
         Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");
 
-        // 'checkpointListC1' denotes 'checkpoint' list for consumer1
-        List<Checkpoint> checkpointListC1 = new ArrayList<>();
-        checkpointListC1.add(cpC1T1P0);
+        // 'checkpointMapC1' denotes 'checkpoint' map for consumer1
+        Map<TopicPartition, Checkpoint> checkpointMapC1 = new HashMap<>();
+        checkpointMapC1.put(cpC1T1P0.topicPartition(), cpC1T1P0);
 
-        // 'checkpointListC2' denotes 'checkpoint' list for consumer2
-        List<Checkpoint> checkpointListC2 = new ArrayList<>();
-        checkpointListC2.add(cpC2T2P0);
+        // 'checkpointMapC2' denotes 'checkpoint' map for consumer2
+        Map<TopicPartition, Checkpoint> checkpointMapC2 = new HashMap<>();
+        checkpointMapC2.put(cpC2T2P0.topicPartition(), cpC2T2P0);
 
-        checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
-        checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);
+        checkpointsPerConsumerGroup.put(consumer1, checkpointMapC1);
+        checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2);
 
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
@@ -195,4 +195,52 @@ public class MirrorCheckpointTaskTest {
         Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
         assertFalse(checkpoint.isPresent());
     }
+
+    @Test
+    public void testCheckpointRecordsMonotonicIfStoreRewinds() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
+        Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+                new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), checkpointsPerConsumerGroup);
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        TopicPartition targetTP = new TopicPartition("source1.topic1", 0);
+
+        long upstream = 11L;
+        long downstream = 4L;
+        // Emit syncs 0 and 1, and use the sync 1 to translate offsets and commit checkpoints
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        long consumerGroupOffset = upstream;
+        long expectedDownstreamOffset = downstream;
+        assertEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+        Map<TopicPartition, Checkpoint> checkpoints = assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, true);
+
+        // the task normally does this, but simulate it here
+        checkpointsPerConsumerGroup.put("g1", checkpoints);
+
+        // Emit syncs 2-6 which will cause the store to drop sync 1, forcing translation to fall back to 0.
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        // The OffsetSyncStore will change its translation of the same offset
+        assertNotEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+        // But the task will filter this out and not emit a checkpoint
+        assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, false);
+
+        // If then the upstream offset rewinds in the topic and is still translatable, a checkpoint will be emitted
+        // also rewinding the downstream offsets to match. This will not affect auto-synced groups, only checkpoints.
+        assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset - 1, true);
+    }
+
+    private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(
+            MirrorCheckpointTask task, TopicPartition tp, TopicPartition remoteTp, long consumerGroupOffset, boolean truth
+    ) {
+        Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = Collections.singletonMap(tp, new OffsetAndMetadata(consumerGroupOffset));
+        Map<TopicPartition, Checkpoint> checkpoints = task.checkpointsForGroup(consumerGroupOffsets, "g1");
+        assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync");
+        return checkpoints;
+    }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 163e5b72250..b2fa9fa4da3 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 import java.util.OptionalLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class OffsetSyncStoreTest {
 
@@ -57,22 +58,22 @@ public class OffsetSyncStoreTest {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
@@ -80,21 +81,21 @@ public class OffsetSyncStoreTest {
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
@@ -102,7 +103,132 @@ public class OffsetSyncStoreTest {
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewinding upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
+        // which provide the best translation accuracy (expires as few syncs as possible)
+        // Each new sync should be added to the cache and expire at most one other sync from the cache
+        long iterations = 10000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
+            for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
+                try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                    int lastCount = 1;
+                    store.start();
+                    for (long offset = firstOffset; offset <= iterations; offset += step) {
+                        store.sync(tp, offset, offset);
+                        // Invariant A: the latest sync is present
+                        assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                        // Invariant D: the earliest sync is present
+                        assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
+                        int count = countDistinctStoredSyncs(store, tp);
+                        int diff = count - lastCount;
+                        assertTrue(diff >= 0,
+                                "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                        lastCount = count;
+                    }
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i < j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                int exponent = Math.max(i - 2, 0);
+                long iUpstreamLowerBound = jUpstream + (1L << exponent);
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstream >= iUpstreamLowerBound,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound
+                                + " (" + jUpstream + " + 2^" + exponent + ")"
+                );
+                long iUpstreamUpperBound = jUpstream + (1L << j) - (1L << i);
+                if (iUpstreamUpperBound < 0)
+                    continue;
+                assertTrue(
+                        iUpstream <= iUpstreamUpperBound,
+                        "Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be no greater than " + iUpstreamUpperBound
+                                + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
+
+                );
+            }
         }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27579d3275f..7c860049318 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -509,7 +509,6 @@ public class MirrorConnectorsIntegrationBaseTest {
 
             waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
         }
 
@@ -655,6 +654,71 @@ public class MirrorConnectorsIntegrationBaseTest {
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
+    @Test
+    public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
+        String consumerGroupName = "consumer-group-lagging-behind";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        warmUpConsumer(consumerProps);
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        // Produce a large number of records to the topic, all replicated within one MM2 lifetime.
+        int iterations = 100;
+        for (int i = 0; i < iterations; i++) {
+            produceMessages(primary, "test-topic-1");
+        }
+        waitForTopicCreated(backup, remoteTopic);
+        assertEquals(iterations * NUM_RECORDS_PRODUCED, backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
+                "Records were not replicated to backup cluster.");
+        // Once the replication has finished, we spin up the upstream consumer and start slowly consuming records
+        ConsumerRecords<byte[], byte[]> allRecords = primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_CONSUME_DURATION_MS, "test-topic-1");
+        MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic);
+        Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
+        log.info("Initial checkpoints: {}", initialCheckpoints);
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitSync(partialOffsets(allRecords, 0.9f));
+            partialCheckpoints = waitForNewCheckpointOnAllPartitions(
+                    backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
+            log.info("Partial checkpoints: {}", partialCheckpoints);
+        }
+
+        for (TopicPartition tp : initialCheckpoints.keySet()) {
+            assertTrue(initialCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+                    "Checkpoints should advance when the upstream consumer group advances");
+        }
+
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
+        Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitSync(partialOffsets(allRecords, 0.1f));
+            finalCheckpoints = waitForNewCheckpointOnAllPartitions(
+                    backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
+            log.info("Final checkpoints: {}", finalCheckpoints);
+        }
+
+        for (TopicPartition tp : partialCheckpoints.keySet()) {
+            assertTrue(finalCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+                    "Checkpoints should rewind when the upstream consumer group rewinds");
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> allRecords, double fraction) {
+        return allRecords.partitions()
+                .stream()
+                .collect(Collectors.toMap(Function.identity(), partition -> {
+                    List<ConsumerRecord<byte[], byte[]>> records = allRecords.records(partition);
+                    int index = (int) (records.size() * fraction);
+                    return new OffsetAndMetadata(records.get(index).offset());
+                }));
+    }
 
     private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
@@ -798,6 +862,13 @@ public class MirrorConnectorsIntegrationBaseTest {
 
     private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
             MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        return waitForNewCheckpointOnAllPartitions(client, consumerGroupName, remoteClusterAlias, topicName, Collections.emptyMap());
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(
+                MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName,
+                Map<TopicPartition, OffsetAndMetadata> lastCheckpoint
     ) throws InterruptedException {
         AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
         waitForCondition(
@@ -805,9 +876,13 @@ public class MirrorConnectorsIntegrationBaseTest {
                     Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
                             consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
                     for (int i = 0; i < NUM_PARTITIONS; i++) {
-                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                        TopicPartition tp = new TopicPartition(topicName, i);
+                        if (!offsets.containsKey(tp)) {
                             log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
                             return false;
+                        } else if (lastCheckpoint.containsKey(tp) && lastCheckpoint.get(tp).equals(offsets.get(tp))) {
+                            log.info("Checkpoint is the same as previous checkpoint");
+                            return false;
                         }
                     }
                     ret.set(offsets);
@@ -867,9 +942,12 @@ public class MirrorConnectorsIntegrationBaseTest {
                 for (TopicPartition tp : tps) {
                     assertTrue(consumerGroupOffsets.containsKey(tp),
                             "TopicPartition " + tp + " does not have translated offsets");
-                    assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp) - offsetLagMax,
-                            "TopicPartition " + tp + " does not have fully-translated offsets");
-                    assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                    long offset = consumerGroupOffsets.get(tp).offset();
+                    assertTrue(offset > lastOffset.get(tp) - offsetLagMax,
+                            "TopicPartition " + tp + " does not have fully-translated offsets: "
+                                    + offset + " is not close enough to " + lastOffset.get(tp)
+                                    + " (strictly more than " + (lastOffset.get(tp) - offsetLagMax) + ")");
+                    assertTrue(offset <= endOffsets.get(tp).offset(),
                             "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
                 }
                 return true;