You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "C0urante (via GitHub)" <gi...@apache.org> on 2023/04/24 10:54:02 UTC

[GitHub] [kafka] C0urante commented on a diff in pull request #13429: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication

C0urante commented on code in PR #13429:
URL: https://github.com/apache/kafka/pull/13429#discussion_r1174698662


##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[j].upstream + 2^(i-2) <= syncs[i].upstream</li>

Review Comment:
   I found this easier to grok when invariants B and C both established clear bounds on the value of `syncs[i]`:
   
   ```suggestion
    *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -25,17 +25,46 @@
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[j].upstream + 2^(i-2) <= syncs[i].upstream</li>
+ *     <li>Invariant D: syncs[63] is the earliest offset sync from the syncs topic which was not eligible for compaction</li>
+ * </ul>
+ * <p>The above invariants ensure that the store is kept updated upon receipt of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows updates to the store in linear time.
+ * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset syncs for the topic were eligible for
+ * compaction at the time of the initial read-to-end.

Review Comment:
   I'm still not a huge fan of the "eligible for compaction" phrase here and above. It does seem to cover the exact scenarios it intends to, but it's a bit misleading. The fears we have about reusing offset syncs for which a new sync on the same topic partition is available don't have to do with compaction, they have to do with compression of in-memory state and that compression not being applied identically across restarts.
   
   I'd like it if we could a) use language that doesn't imply that compaction is the issue and b) add the rationale for not reusing older offset syncs in a comment somewhere (possibly where we do the `!readToEnd` check?).



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, current);
+                assert invariantC(syncs[previous], syncs[current], previous, current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);

Review Comment:
   Is my understanding correct that the right-hand expression here is at risk of overflow and, to account for that, we include the `bound < 0` condition in the return statement below?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs

Review Comment:
   ```suggestion
               // Rewinding upstream offsets should clear all historical syncs
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {

Review Comment:
   IMO vanilla multiplication is more readable here:
   ```suggestion
           for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache

Review Comment:
   ```suggestion
           // Each new sync should expire at most two other syncs from the cache (in addition to always adding a single sync to the cache)
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored

Review Comment:
   I read this as a TODO at first; the "Consider ..." language threw me. Do you think this might be clearer?
   ```suggestion
               // We can potentially use oldValue instead of replacement, which allows us to keep more distinct values stored
               // We check invariants B and C with oldValue to see if it can be used
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, current);
+                assert invariantC(syncs[previous], syncs[current], previous, current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i, int j) {

Review Comment:
   We don't use `j` in this invariant:
   ```suggestion
       private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];

Review Comment:
   This is completely optional; feel free to include/not include/modify at your discretion
   ```suggestion
           // The most-recently-discarded offset sync
           // We track this since it may still be eligible for use in the syncs array at a later index
           OffsetSync oldValue = syncs[0];
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, current);
+                assert invariantC(syncs[previous], syncs[current], previous, current);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));

Review Comment:
   Same question RE overflow susceptibility and handling



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,

Review Comment:
   If we rewrite invariant C to establish a lower bound for `syncs[i]`, we should probably also rewrite this part to reflect that:
   ```suggestion
                          iUpstream >= iUpstreamLowerBound,
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {

Review Comment:
   Invariants B and C deal exclusively with values of `i` and `j`; shouldn't this be reflected here (especially since the `jUpstream == iUpstream` branch below makes it unnecessary to test the case when `i == j`)?
   ```suggestion
               for (int i = 0; i < j; i++) {
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound + " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"
+                );
+                long iUpstreamBound = jUpstream + (1L << j) - (1L << i);

Review Comment:
   Nit: `iUpstreamUpperBound` is clearer, especially since we already have `iUpstreamLowerBound` declared in the same scope.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound + " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"
+                );
+                long iUpstreamBound = jUpstream + (1L << j) - (1L << i);
+                if (iUpstreamBound < 0)
+                    continue;
+                assertTrue(
+                        iUpstream <= iUpstreamBound,
+                        "Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be no greater than " + iUpstreamBound + " (" + jUpstream + " + 2^" + j + ")"

Review Comment:
   This doesn't match the invariant (or the right-hand expression for the declaration of `iUpstreamBound`):
   ```suggestion
                                   + " should be no greater than " + iUpstreamBound + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
   ```



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous, current);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values

Review Comment:
   Why? (Feel free to explain via GH discussion thread instead of code comments if a succinct explanation is difficult.)



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java:
##########
@@ -139,10 +180,141 @@ public void close() {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // Old offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // Consider using oldValue instead of replacement, which allows us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current, next);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.

Review Comment:
   ```suggestion
               // The replacement variable always contains a value which satisfies the invariants for this index.
               // Note that this does not mean that we have to use the replacement (since the invariants could already be satisfied without overwriting the current sync)
   ```



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache

Review Comment:
   Also, it'd be nice to understand why this holds given our invariants. If we can fit that into a comment, great; if not, a GitHub discussion thread is better than nothing.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java:
##########
@@ -57,52 +58,170 @@ public void testOffsetTranslation() {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
     @Test
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
     @Test
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewind upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // We should not expire more syncs from the store than necessary;
+        // Each new sync should expire at most two other syncs from the cache
+        long iterations = 1000000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step << 1) + 1)  {
+            try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                int lastCount = 1;
+                store.start();
+                for (long offset = 0; offset <= iterations; offset += step) {
+                    store.sync(tp, offset, offset);
+                    // Invariant A: the latest sync is present
+                    assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                    // Invariant D: the earliest sync is present
+                    assertEquals(0L, store.syncFor(tp, 63).upstreamOffset());
+                    int count = countDistinctStoredSyncs(store, tp);
+                    int diff = count - lastCount;
+                    assertTrue(diff >= -1,
+                            "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                    lastCount = count;
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i <= j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                long iUpstreamLowerBound = jUpstream + (1L << Math.max(i - 2, 0));
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstreamLowerBound <= iUpstream,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound + " (" + jUpstream + " + 2^" + Math.max(i - 1, 0) + ")"

Review Comment:
   This doesn't match the invariant, should we change it?
   ```suggestion
                                   + " should be at least " + iUpstreamLowerBound + " (" + jUpstream + " + 2^" + Math.max(i - 2, 0) + ")"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org