You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/05/02 17:41:23 UTC

[kafka] branch 3.3 updated (912ee5a2f95 -> 8d275d4741f)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 912ee5a2f95 MINOR: update docs note about spurious stream-stream join results (#13642)
     new b58ceaf6dd9 KAFKA-14837/14842:Avoid the rebalance caused by the addition and deletion of irrelevant groups for MirrorCheckPointConnector (#13446)
     new 7127a225705 MINOR: Refactor Mirror integration tests to reduce duplication (#13428)
     new 8d275d4741f KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../connect/mirror/MirrorCheckpointConnector.java  |  42 +++-
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  59 ++++-
 .../kafka/connect/mirror/OffsetSyncStore.java      | 192 ++++++++++++++-
 .../mirror/MirrorCheckpointConnectorTest.java      |  51 +++-
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  70 +++++-
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  | 156 ++++++++++--
 .../IdentityReplicationIntegrationTest.java        | 213 +---------------
 .../MirrorConnectorsIntegrationBaseTest.java       | 273 +++++++++++++++------
 8 files changed, 719 insertions(+), 337 deletions(-)


[kafka] 03/03: KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8d275d4741f1940172988d976daf75b5cf742db0
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Wed Apr 26 00:30:13 2023 -0700

    KAFKA-14666: Add MM2 in-memory offset translation index for offsets behind replication (#13429)
    
    Reviewers: Daniel Urban <du...@cloudera.com>, Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  57 ++++--
 .../kafka/connect/mirror/OffsetSyncStore.java      | 192 ++++++++++++++++++++-
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  70 ++++++--
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  | 156 +++++++++++++++--
 .../MirrorConnectorsIntegrationBaseTest.java       |  90 +++++++++-
 5 files changed, 514 insertions(+), 51 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 1ed1e9f0b7d..cd0ba889b3f 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -41,6 +41,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.Collections;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.concurrent.ExecutionException;
 import java.time.Duration;
@@ -66,20 +67,21 @@ public class MirrorCheckpointTask extends SourceTask {
     private MirrorMetrics metrics;
     private Scheduler scheduler;
     private Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset;
-    private Map<String, List<Checkpoint>> checkpointsPerConsumerGroup;
+    private Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup;
     public MirrorCheckpointTask() {}
 
     // for testing
     MirrorCheckpointTask(String sourceClusterAlias, String targetClusterAlias,
             ReplicationPolicy replicationPolicy, OffsetSyncStore offsetSyncStore,
             Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset,
-            Map<String, List<Checkpoint>> checkpointsPerConsumerGroup) {
+            Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup) {
         this.sourceClusterAlias = sourceClusterAlias;
         this.targetClusterAlias = targetClusterAlias;
         this.replicationPolicy = replicationPolicy;
         this.offsetSyncStore = offsetSyncStore;
         this.idleConsumerGroupsOffset = idleConsumerGroupsOffset;
         this.checkpointsPerConsumerGroup = checkpointsPerConsumerGroup;
+        this.topicFilter = topic -> true;
     }
 
     @Override
@@ -164,9 +166,11 @@ public class MirrorCheckpointTask extends SourceTask {
     private List<SourceRecord> sourceRecordsForGroup(String group) throws InterruptedException {
         try {
             long timestamp = System.currentTimeMillis();
-            List<Checkpoint> checkpoints = checkpointsForGroup(group);
-            checkpointsPerConsumerGroup.put(group, checkpoints);
-            return checkpoints.stream()
+            Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets = listConsumerGroupOffsets(group);
+            Map<TopicPartition, Checkpoint> newCheckpoints = checkpointsForGroup(upstreamGroupOffsets, group);
+            Map<TopicPartition, Checkpoint> oldCheckpoints = checkpointsPerConsumerGroup.computeIfAbsent(group, ignored -> new HashMap<>());
+            oldCheckpoints.putAll(newCheckpoints);
+            return newCheckpoints.values().stream()
                 .map(x -> checkpointRecord(x, timestamp))
                 .collect(Collectors.toList());
         } catch (ExecutionException e) {
@@ -175,13 +179,44 @@ public class MirrorCheckpointTask extends SourceTask {
         }
     }
 
-    private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
-        return listConsumerGroupOffsets(group).entrySet().stream()
+    // for testing
+    Map<TopicPartition, Checkpoint> checkpointsForGroup(Map<TopicPartition, OffsetAndMetadata> upstreamGroupOffsets, String group) {
+        return upstreamGroupOffsets.entrySet().stream()
             .filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
             .map(x -> checkpoint(group, x.getKey(), x.getValue()))
             .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
             .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately
-            .collect(Collectors.toList());
+            .filter(this::checkpointIsMoreRecent) // do not emit checkpoints for partitions that have a later checkpoint
+            .collect(Collectors.toMap(Checkpoint::topicPartition, Function.identity()));
+    }
+
+    private boolean checkpointIsMoreRecent(Checkpoint checkpoint) {
+        Map<TopicPartition, Checkpoint> checkpoints = checkpointsPerConsumerGroup.get(checkpoint.consumerGroupId());
+        if (checkpoints == null) {
+            log.trace("Emitting {} (first for this group)", checkpoint);
+            return true;
+        }
+        Checkpoint lastCheckpoint = checkpoints.get(checkpoint.topicPartition());
+        if (lastCheckpoint == null) {
+            log.trace("Emitting {} (first for this partition)", checkpoint);
+            return true;
+        }
+        // Emit sync after a rewind of the upstream consumer group takes place (checkpoints can be non-monotonic)
+        if (checkpoint.upstreamOffset() < lastCheckpoint.upstreamOffset()) {
+            log.trace("Emitting {} (upstream offset rewind)", checkpoint);
+            return true;
+        }
+        // Or if the downstream offset is newer (force checkpoints to be monotonic)
+        if (checkpoint.downstreamOffset() > lastCheckpoint.downstreamOffset()) {
+            log.trace("Emitting {} (downstream offset advanced)", checkpoint);
+            return true;
+        }
+        if (checkpoint.downstreamOffset() != lastCheckpoint.downstreamOffset()) {
+            log.trace("Skipping {} (preventing downstream rewind)", checkpoint);
+        } else {
+            log.trace("Skipping {} (repeated checkpoint)", checkpoint);
+        }
+        return false;
     }
 
     private Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
@@ -198,7 +233,7 @@ public class MirrorCheckpointTask extends SourceTask {
         if (offsetAndMetadata != null) {
             long upstreamOffset = offsetAndMetadata.offset();
             OptionalLong downstreamOffset =
-                offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+                offsetSyncStore.translateDownstream(group, topicPartition, upstreamOffset);
             if (downstreamOffset.isPresent()) {
                 return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
                     upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
@@ -324,10 +359,10 @@ public class MirrorCheckpointTask extends SourceTask {
     Map<String, Map<TopicPartition, OffsetAndMetadata>> getConvertedUpstreamOffset() {
         Map<String, Map<TopicPartition, OffsetAndMetadata>> result = new HashMap<>();
 
-        for (Entry<String, List<Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
+        for (Entry<String, Map<TopicPartition, Checkpoint>> entry : checkpointsPerConsumerGroup.entrySet()) {
             String consumerId = entry.getKey();
             Map<TopicPartition, OffsetAndMetadata> convertedUpstreamOffset = new HashMap<>();
-            for (Checkpoint checkpoint : entry.getValue()) {
+            for (Checkpoint checkpoint : entry.getValue().values()) {
                 convertedUpstreamOffset.put(checkpoint.topicPartition(), checkpoint.offsetAndMetadata());
             }
             result.put(consumerId, convertedUpstreamOffset);
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 52bad401e8a..9d1c2564e21 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -24,17 +24,46 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.apache.kafka.connect.util.TopicAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ConcurrentHashMap;
 
-/** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
+/**
+ * Used internally by MirrorMaker. Stores offset syncs and performs offset translation.
+ * <p>A limited number of offset syncs can be stored per TopicPartition, in a way which provides better translation
+ * later in the topic, closer to the live end of the topic.
+ * This maintains the following invariants for each topic-partition in the in-memory sync storage:
+ * <ul>
+ *     <li>Invariant A: syncs[0] is the latest offset sync from the syncs topic</li>
+ *     <li>Invariant B: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream <= syncs[j].upstream + 2^j - 2^i</li>
+ *     <li>Invariant C: For each i,j, i < j, syncs[i] != syncs[j]: syncs[i].upstream >= syncs[j].upstream + 2^(i-2)</li>
+ *     <li>Invariant D: syncs[63] is the earliest offset sync from the syncs topic usable for translation</li>
+ * </ul>
+ * <p>The above invariants ensure that the store is kept updated upon receipt of each sync, and that distinct
+ * offset syncs are separated by approximately exponential space. They can be checked locally (by comparing all adjacent
+ * indexes) but hold globally (for all pairs of any distance). This allows updates to the store in linear time.
+ * <p>Offset translation uses the syncs[i] which most closely precedes the upstream consumer group's current offset.
+ * For a fixed in-memory state, translation of variable upstream offsets will be monotonic.
+ * For variable in-memory state, translation of a fixed upstream offset will not be monotonic.
+ * <p>Translation will be unavailable for all topic-partitions before an initial read-to-end of the offset syncs topic
+ * is complete. Translation will be unavailable after that if no syncs are present for a topic-partition, if replication
+ * started after the position of the consumer group, or if relevant offset syncs for the topic were potentially used as
+ * for translation in an earlier generation of the sync store.
+ */
 class OffsetSyncStore implements AutoCloseable {
+
+    private static final Logger log = LoggerFactory.getLogger(OffsetSyncStore.class);
+    // Store one offset sync for each bit of the topic offset.
+    // Visible for testing
+    static final int SYNCS_PER_PARTITION = Long.SIZE;
     private final KafkaBasedLog<byte[], byte[]> backingStore;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final Map<TopicPartition, OffsetSync[]> offsetSyncs = new ConcurrentHashMap<>();
     private final TopicAdmin admin;
     protected volatile boolean readToEnd = false;
 
@@ -96,16 +125,21 @@ class OffsetSyncStore implements AutoCloseable {
         readToEnd = true;
     }
 
-    OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+    OptionalLong translateDownstream(String group, TopicPartition sourceTopicPartition, long upstreamOffset) {
         if (!readToEnd) {
             // If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
             // This prevents emitting stale offsets while initially reading the offset syncs topic.
+            log.debug("translateDownstream({},{},{}): Skipped (initial offset syncs read still in progress)",
+                    group, sourceTopicPartition, upstreamOffset);
             return OptionalLong.empty();
         }
-        Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
+        Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition, upstreamOffset);
         if (offsetSync.isPresent()) {
             if (offsetSync.get().upstreamOffset() > upstreamOffset) {
                 // Offset is too far in the past to translate accurately
+                log.debug("translateDownstream({},{},{}): Skipped ({} is ahead of upstream consumer group {})",
+                        group, sourceTopicPartition, upstreamOffset,
+                        offsetSync.get(), upstreamOffset);
                 return OptionalLong.of(-1L);
             }
             // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
@@ -121,8 +155,15 @@ class OffsetSyncStore implements AutoCloseable {
             //          vv
             // target |-sg----r-----|
             long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
+            log.debug("translateDownstream({},{},{}): Translated {} (relative to {})",
+                    group, sourceTopicPartition, upstreamOffset,
+                    offsetSync.get().downstreamOffset() + upstreamStep,
+                    offsetSync.get()
+            );
             return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
         } else {
+            log.debug("translateDownstream({},{},{}): Skipped (offset sync not found)",
+                    group, sourceTopicPartition, upstreamOffset);
             return OptionalLong.empty();
         }
     }
@@ -136,10 +177,147 @@ class OffsetSyncStore implements AutoCloseable {
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
         OffsetSync offsetSync = OffsetSync.deserializeRecord(record);
         TopicPartition sourceTopicPartition = offsetSync.topicPartition();
-        offsetSyncs.put(sourceTopicPartition, offsetSync);
+        offsetSyncs.compute(sourceTopicPartition, (ignored, syncs) ->
+                syncs == null ? createInitialSyncs(offsetSync) : updateExistingSyncs(syncs, offsetSync)
+        );
+    }
+
+    private OffsetSync[] updateExistingSyncs(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // Make a copy of the array before mutating it, so that readers do not see inconsistent data
+        // TODO: consider batching updates so that this copy can be performed less often for high-volume sync topics.
+        OffsetSync[] mutableSyncs = Arrays.copyOf(syncs, SYNCS_PER_PARTITION);
+        updateSyncArray(mutableSyncs, offsetSync);
+        if (log.isTraceEnabled()) {
+            log.trace("New sync {} applied, new state is {}", offsetSync, offsetArrayToString(mutableSyncs));
+        }
+        return mutableSyncs;
+    }
+
+    private String offsetArrayToString(OffsetSync[] syncs) {
+        StringBuilder stateString = new StringBuilder();
+        stateString.append("[");
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            if (i == 0 || syncs[i] != syncs[i - 1]) {
+                if (i != 0) {
+                    stateString.append(",");
+                }
+                // Print only if the sync is interesting, a series of repeated syncs will be elided
+                stateString.append(syncs[i].upstreamOffset());
+                stateString.append(":");
+                stateString.append(syncs[i].downstreamOffset());
+            }
+        }
+        stateString.append("]");
+        return stateString.toString();
+    }
+
+    private OffsetSync[] createInitialSyncs(OffsetSync firstSync) {
+        OffsetSync[] syncs = new OffsetSync[SYNCS_PER_PARTITION];
+        clearSyncArray(syncs, firstSync);
+        return syncs;
+    }
+
+    private void clearSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        // If every element of the store is the same, then it satisfies invariants B and C trivially.
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            syncs[i] = offsetSync;
+        }
+    }
+
+    private void updateSyncArray(OffsetSync[] syncs, OffsetSync offsetSync) {
+        long upstreamOffset = offsetSync.upstreamOffset();
+        // While reading to the end of the topic, ensure that our earliest sync is later than
+        // any earlier sync that could have been used for translation, to preserve monotonicity
+        // If the upstream offset rewinds, all previous offsets are invalid, so overwrite them all.
+        if (!readToEnd || syncs[0].upstreamOffset() > upstreamOffset) {
+            clearSyncArray(syncs, offsetSync);
+            return;
+        }
+        OffsetSync replacement = offsetSync;
+        // The most-recently-discarded offset sync
+        // We track this since it may still be eligible for use in the syncs array at a later index
+        OffsetSync oldValue = syncs[0];
+        // Invariant A is always violated once a new sync appears.
+        // Repair Invariant A: the latest sync must always be updated
+        syncs[0] = replacement;
+        for (int current = 1; current < SYNCS_PER_PARTITION; current++) {
+            int previous = current - 1;
+
+            // We can potentially use oldValue instead of replacement, allowing us to keep more distinct values stored
+            // If oldValue is not recent, it should be expired from the store
+            boolean isRecent = invariantB(syncs[previous], oldValue, previous, current);
+            // Ensure that this value is sufficiently separated from the previous value
+            // We prefer to keep more recent syncs of similar precision (i.e. the value in replacement)
+            boolean separatedFromPrevious = invariantC(syncs[previous], oldValue, previous);
+            // Ensure that this value is sufficiently separated from the next value
+            // We prefer to keep existing syncs of lower precision (i.e. the value in syncs[next])
+            int next = current + 1;
+            boolean separatedFromNext = next >= SYNCS_PER_PARTITION || invariantC(oldValue, syncs[next], current);
+            // If this condition is false, oldValue will be expired from the store and lost forever.
+            if (isRecent && separatedFromPrevious && separatedFromNext) {
+                replacement = oldValue;
+            }
+
+            // The replacement variable always contains a value which satisfies the invariants for this index.
+            // This replacement may or may not be used, since the invariants could already be satisfied,
+            // and in that case, prefer to keep the existing tail of the syncs array rather than updating it.
+            assert invariantB(syncs[previous], replacement, previous, current);
+            assert invariantC(syncs[previous], replacement, previous);
+
+            // Test if changes to the previous index affected the invariant for this index
+            if (invariantB(syncs[previous], syncs[current], previous, current)) {
+                // Invariant B holds for syncs[current]: it must also hold for all later values
+                break;
+            } else {
+                // Invariant B violated for syncs[current]: sync is now too old and must be updated
+                // Repair Invariant B: swap in replacement, and save the old value for the next iteration
+                oldValue = syncs[current];
+                syncs[current] = replacement;
+
+                assert invariantB(syncs[previous], syncs[current], previous, current);
+                assert invariantC(syncs[previous], syncs[current], previous);
+            }
+        }
+    }
+
+    private boolean invariantB(OffsetSync iSync, OffsetSync jSync, int i, int j) {
+        long bound = jSync.upstreamOffset() + (1L << j) - (1L << i);
+        return iSync == jSync || bound < 0 || iSync.upstreamOffset() <= bound;
+    }
+
+    private boolean invariantC(OffsetSync iSync, OffsetSync jSync, int i) {
+        long bound = jSync.upstreamOffset() + (1L << Math.max(i - 2, 0));
+        return iSync == jSync || (bound >= 0 && iSync.upstreamOffset() >= bound);
+    }
+
+    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition, long upstreamOffset) {
+        return Optional.ofNullable(offsetSyncs.get(topicPartition))
+                .map(syncs -> lookupLatestSync(syncs, upstreamOffset));
+    }
+
+
+    private OffsetSync lookupLatestSync(OffsetSync[] syncs, long upstreamOffset) {
+        // linear search the syncs, effectively a binary search over the topic offsets
+        // Search from latest to earliest to find the sync that gives the best accuracy
+        for (int i = 0; i < SYNCS_PER_PARTITION; i++) {
+            OffsetSync offsetSync = syncs[i];
+            if (offsetSync.upstreamOffset() <= upstreamOffset) {
+                return offsetSync;
+            }
+        }
+        return syncs[SYNCS_PER_PARTITION - 1];
     }
 
-    private Optional<OffsetSync> latestOffsetSync(TopicPartition topicPartition) {
-        return Optional.ofNullable(offsetSyncs.get(topicPartition));
+    // For testing
+    OffsetSync syncFor(TopicPartition topicPartition, int syncIdx) {
+        OffsetSync[] syncs = offsetSyncs.get(topicPartition);
+        if (syncs == null)
+            throw new IllegalArgumentException("No syncs present for " + topicPartition);
+        if (syncIdx >= syncs.length)
+            throw new IllegalArgumentException(
+                    "Requested sync " + (syncIdx + 1) + " for " + topicPartition
+                            + " but there are only " + syncs.length + " syncs available for that topic partition"
+            );
+        return syncs[syncIdx];
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 500cb6c131a..f9bc7bc76cb 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -16,12 +16,11 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Collections;
 import java.util.Optional;
+import java.util.OptionalLong;
 
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -31,6 +30,7 @@ import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class MirrorCheckpointTaskTest {
@@ -118,7 +118,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testSyncOffset() {
         Map<String, Map<TopicPartition, OffsetAndMetadata>> idleConsumerGroupsOffset = new HashMap<>();
-        Map<String, List<Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+        Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
 
         String consumer1 = "consumer1";
         String consumer2 = "consumer2";
@@ -147,16 +147,16 @@ public class MirrorCheckpointTaskTest {
         // 'cpC2T2p0' denotes 'checkpoint' of topic2, partition 0 for consumer2
         Checkpoint cpC2T2P0 = new Checkpoint(consumer2, new TopicPartition(topic2, 0), 100, 51, "metadata");
 
-        // 'checkpointListC1' denotes 'checkpoint' list for consumer1
-        List<Checkpoint> checkpointListC1 = new ArrayList<>();
-        checkpointListC1.add(cpC1T1P0);
+        // 'checkpointMapC1' denotes 'checkpoint' map for consumer1
+        Map<TopicPartition, Checkpoint> checkpointMapC1 = new HashMap<>();
+        checkpointMapC1.put(cpC1T1P0.topicPartition(), cpC1T1P0);
 
-        // 'checkpointListC2' denotes 'checkpoint' list for consumer2
-        List<Checkpoint> checkpointListC2 = new ArrayList<>();
-        checkpointListC2.add(cpC2T2P0);
+        // 'checkpointMapC2' denotes 'checkpoint' map for consumer2
+        Map<TopicPartition, Checkpoint> checkpointMapC2 = new HashMap<>();
+        checkpointMapC2.put(cpC2T2P0.topicPartition(), cpC2T2P0);
 
-        checkpointsPerConsumerGroup.put(consumer1, checkpointListC1);
-        checkpointsPerConsumerGroup.put(consumer2, checkpointListC2);
+        checkpointsPerConsumerGroup.put(consumer1, checkpointMapC1);
+        checkpointsPerConsumerGroup.put(consumer2, checkpointMapC2);
 
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), null, idleConsumerGroupsOffset, checkpointsPerConsumerGroup);
@@ -195,4 +195,52 @@ public class MirrorCheckpointTaskTest {
         Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
         assertFalse(checkpoint.isPresent());
     }
+
+    @Test
+    public void testCheckpointRecordsMonotonicIfStoreRewinds() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
+        Map<String, Map<TopicPartition, Checkpoint>> checkpointsPerConsumerGroup = new HashMap<>();
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+                new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), checkpointsPerConsumerGroup);
+        TopicPartition tp = new TopicPartition("topic1", 0);
+        TopicPartition targetTP = new TopicPartition("source1.topic1", 0);
+
+        long upstream = 11L;
+        long downstream = 4L;
+        // Emit syncs 0 and 1, and use the sync 1 to translate offsets and commit checkpoints
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        long consumerGroupOffset = upstream;
+        long expectedDownstreamOffset = downstream;
+        assertEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+        Map<TopicPartition, Checkpoint> checkpoints = assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, true);
+
+        // the task normally does this, but simulate it here
+        checkpointsPerConsumerGroup.put("g1", checkpoints);
+
+        // Emit syncs 2-6 which will cause the store to drop sync 1, forcing translation to fall back to 0.
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        offsetSyncStore.sync(tp, upstream++, downstream++);
+        // The OffsetSyncStore will change its translation of the same offset
+        assertNotEquals(OptionalLong.of(expectedDownstreamOffset), offsetSyncStore.translateDownstream("g1", tp, consumerGroupOffset));
+        // But the task will filter this out and not emit a checkpoint
+        assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset, false);
+
+        // If then the upstream offset rewinds in the topic and is still translatable, a checkpoint will be emitted
+        // also rewinding the downstream offsets to match. This will not affect auto-synced groups, only checkpoints.
+        assertCheckpointForTopic(mirrorCheckpointTask, tp, targetTP, consumerGroupOffset - 1, true);
+    }
+
+    private Map<TopicPartition, Checkpoint> assertCheckpointForTopic(
+            MirrorCheckpointTask task, TopicPartition tp, TopicPartition remoteTp, long consumerGroupOffset, boolean truth
+    ) {
+        Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets = Collections.singletonMap(tp, new OffsetAndMetadata(consumerGroupOffset));
+        Map<TopicPartition, Checkpoint> checkpoints = task.checkpointsForGroup(consumerGroupOffsets, "g1");
+        assertEquals(truth, checkpoints.containsKey(remoteTp), "should" + (truth ? "" : " not") + " emit offset sync");
+        return checkpoints;
+    }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 163e5b72250..b2fa9fa4da3 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test;
 import java.util.OptionalLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class OffsetSyncStoreTest {
 
@@ -57,22 +58,22 @@ public class OffsetSyncStoreTest {
 
             // Emit synced downstream offset without dead-reckoning
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 150));
 
             // Translate exact offsets
             store.sync(tp, 150, 251);
-            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+            assertEquals(OptionalLong.of(251), store.translateDownstream(null, tp, 150));
 
             // Use old offset (5) prior to any sync -> can't translate
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 5));
 
             // Downstream offsets reset
             store.sync(tp, 200, 10);
-            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(10), store.translateDownstream(null, tp, 200));
 
             // Upstream offsets reset
             store.sync(tp, 20, 20);
-            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+            assertEquals(OptionalLong.of(20), store.translateDownstream(null, tp, 20));
         }
     }
 
@@ -80,21 +81,21 @@ public class OffsetSyncStoreTest {
     public void testNoTranslationIfStoreNotStarted() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             // no offsets exist and store is not started
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // read a sync during startup
             store.sync(tp, 100, 200);
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 200));
 
             // After the store is started all offsets are visible
             store.start();
-            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
-            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
-            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(null, tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(null, tp, 200));
         }
     }
 
@@ -102,7 +103,132 @@ public class OffsetSyncStoreTest {
     public void testNoTranslationIfNoOffsetSync() {
         try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
             store.start();
-            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(null, tp, 0));
+        }
+    }
+
+    @Test
+    public void testPastOffsetTranslation() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            long maxOffsetLag = 10;
+            int offset = 0;
+            for (; offset <= 1000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+            store.start();
+
+            // After starting but before seeing new offsets, only the latest startup offset can be translated
+            assertSparseSync(store, 1000, -1);
+
+            for (; offset <= 10000; offset += maxOffsetLag) {
+                store.sync(tp, offset, offset);
+                assertSparseSyncInvariant(store, tp);
+            }
+
+            // After seeing new offsets, we still cannot translate earlier than the latest startup offset
+            // Invariant D: the last sync from the initial read-to-end is still stored
+            assertSparseSync(store, 1000, -1);
+
+            // We can translate offsets between the latest startup offset and the latest offset with variable precision
+            // Older offsets are less precise and translation ends up farther apart
+            assertSparseSync(store, 4840, 1000);
+            assertSparseSync(store, 6760, 4840);
+            assertSparseSync(store, 8680, 6760);
+            assertSparseSync(store, 9160, 8680);
+            assertSparseSync(store, 9640, 9160);
+            assertSparseSync(store, 9880, 9640);
+            assertSparseSync(store, 9940, 9880);
+            assertSparseSync(store, 9970, 9940);
+            assertSparseSync(store, 9990, 9970);
+            assertSparseSync(store, 10000, 9990);
+
+            // Rewinding upstream offsets should clear all historical syncs
+            store.sync(tp, 1500, 11000);
+            assertSparseSyncInvariant(store, tp);
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(null, tp, 1499));
+            assertEquals(OptionalLong.of(11000), store.translateDownstream(null, tp, 1500));
+            assertEquals(OptionalLong.of(11001), store.translateDownstream(null, tp, 2000));
+        }
+    }
+
+    @Test
+    public void testKeepMostDistinctSyncs() {
+        // Under normal operation, the incoming syncs will be regularly spaced and the store should keep a set of syncs
+        // which provide the best translation accuracy (expires as few syncs as possible)
+        // Each new sync should be added to the cache and expire at most one other sync from the cache
+        long iterations = 10000;
+        long maxStep = Long.MAX_VALUE / iterations;
+        // Test a variety of steps (corresponding to the offset.lag.max configuration)
+        for (long step = 1; step < maxStep; step = (step * 2) + 1)  {
+            for (long firstOffset = 0; firstOffset < 30; firstOffset++) {
+                try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+                    int lastCount = 1;
+                    store.start();
+                    for (long offset = firstOffset; offset <= iterations; offset += step) {
+                        store.sync(tp, offset, offset);
+                        // Invariant A: the latest sync is present
+                        assertEquals(offset, store.syncFor(tp, 0).upstreamOffset());
+                        // Invariant D: the earliest sync is present
+                        assertEquals(firstOffset, store.syncFor(tp, 63).upstreamOffset());
+                        int count = countDistinctStoredSyncs(store, tp);
+                        int diff = count - lastCount;
+                        assertTrue(diff >= 0,
+                                "Store expired too many syncs: " + diff + " after receiving offset " + offset);
+                        lastCount = count;
+                    }
+                }
+            }
+        }
+    }
+
+    private void assertSparseSync(FakeOffsetSyncStore store, long syncOffset, long previousOffset) {
+        assertEquals(OptionalLong.of(previousOffset == -1 ? previousOffset : previousOffset + 1), store.translateDownstream(null, tp, syncOffset - 1));
+        assertEquals(OptionalLong.of(syncOffset), store.translateDownstream(null, tp, syncOffset));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 1));
+        assertEquals(OptionalLong.of(syncOffset + 1), store.translateDownstream(null, tp, syncOffset + 2));
+    }
+
+    private int countDistinctStoredSyncs(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        int count = 1;
+        for (int i = 1; i < OffsetSyncStore.SYNCS_PER_PARTITION; i++) {
+            if (store.syncFor(topicPartition, i - 1) != store.syncFor(topicPartition, i)) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private void assertSparseSyncInvariant(FakeOffsetSyncStore store, TopicPartition topicPartition) {
+        for (int j = 0; j < OffsetSyncStore.SYNCS_PER_PARTITION; j++) {
+            for (int i = 0; i < j; i++) {
+                long jUpstream = store.syncFor(topicPartition, j).upstreamOffset();
+                long iUpstream = store.syncFor(topicPartition, i).upstreamOffset();
+                if (jUpstream == iUpstream) {
+                    continue;
+                }
+                int exponent = Math.max(i - 2, 0);
+                long iUpstreamLowerBound = jUpstream + (1L << exponent);
+                if (iUpstreamLowerBound < 0) {
+                    continue;
+                }
+                assertTrue(
+                        iUpstream >= iUpstreamLowerBound,
+                        "Invariant C(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be at least " + iUpstreamLowerBound
+                                + " (" + jUpstream + " + 2^" + exponent + ")"
+                );
+                long iUpstreamUpperBound = jUpstream + (1L << j) - (1L << i);
+                if (iUpstreamUpperBound < 0)
+                    continue;
+                assertTrue(
+                        iUpstream <= iUpstreamUpperBound,
+                        "Invariant B(" + i + "," + j + "): Upstream offset " + iUpstream + " at position " + i
+                                + " should be no greater than " + iUpstreamUpperBound
+                                + " (" + jUpstream + " + 2^" + j + " - 2^" + i + ")"
+
+                );
+            }
         }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index c9b2d54e6dc..5bd12f56a97 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -504,7 +504,6 @@ public class MirrorConnectorsIntegrationBaseTest {
 
             waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
         }
 
@@ -650,6 +649,71 @@ public class MirrorConnectorsIntegrationBaseTest {
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
+    @Test
+    public void testOffsetTranslationBehindReplicationFlow() throws InterruptedException {
+        String consumerGroupName = "consumer-group-lagging-behind";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        warmUpConsumer(consumerProps);
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Props.put("offset.lag.max", Integer.toString(OFFSET_LAG_MAX));
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        // Produce a large number of records to the topic, all replicated within one MM2 lifetime.
+        int iterations = 100;
+        for (int i = 0; i < iterations; i++) {
+            produceMessages(primary, "test-topic-1");
+        }
+        waitForTopicCreated(backup, remoteTopic);
+        assertEquals(iterations * NUM_RECORDS_PRODUCED, backup.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, remoteTopic).count(),
+                "Records were not replicated to backup cluster.");
+        // Once the replication has finished, we spin up the upstream consumer and start slowly consuming records
+        ConsumerRecords<byte[], byte[]> allRecords = primary.kafka().consume(iterations * NUM_RECORDS_PRODUCED, RECORD_CONSUME_DURATION_MS, "test-topic-1");
+        MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
+        Map<TopicPartition, OffsetAndMetadata> initialCheckpoints = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic);
+        Map<TopicPartition, OffsetAndMetadata> partialCheckpoints;
+        log.info("Initial checkpoints: {}", initialCheckpoints);
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitSync(partialOffsets(allRecords, 0.9f));
+            partialCheckpoints = waitForNewCheckpointOnAllPartitions(
+                    backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, initialCheckpoints);
+            log.info("Partial checkpoints: {}", partialCheckpoints);
+        }
+
+        for (TopicPartition tp : initialCheckpoints.keySet()) {
+            assertTrue(initialCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+                    "Checkpoints should advance when the upstream consumer group advances");
+        }
+
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
+        Map<TopicPartition, OffsetAndMetadata> finalCheckpoints;
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            primaryConsumer.commitSync(partialOffsets(allRecords, 0.1f));
+            finalCheckpoints = waitForNewCheckpointOnAllPartitions(
+                    backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, remoteTopic, partialCheckpoints);
+            log.info("Final checkpoints: {}", finalCheckpoints);
+        }
+
+        for (TopicPartition tp : partialCheckpoints.keySet()) {
+            assertTrue(finalCheckpoints.get(tp).offset() < partialCheckpoints.get(tp).offset(),
+                    "Checkpoints should rewind when the upstream consumer group rewinds");
+        }
+    }
+
+    private Map<TopicPartition, OffsetAndMetadata> partialOffsets(ConsumerRecords<byte[], byte[]> allRecords, double fraction) {
+        return allRecords.partitions()
+                .stream()
+                .collect(Collectors.toMap(Function.identity(), partition -> {
+                    List<ConsumerRecord<byte[], byte[]>> records = allRecords.records(partition);
+                    int index = (int) (records.size() * fraction);
+                    return new OffsetAndMetadata(records.get(index).offset());
+                }));
+    }
 
     private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
@@ -790,6 +854,13 @@ public class MirrorConnectorsIntegrationBaseTest {
 
     private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
             MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        return waitForNewCheckpointOnAllPartitions(client, consumerGroupName, remoteClusterAlias, topicName, Collections.emptyMap());
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForNewCheckpointOnAllPartitions(
+                MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName,
+                Map<TopicPartition, OffsetAndMetadata> lastCheckpoint
     ) throws InterruptedException {
         AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
         waitForCondition(
@@ -797,9 +868,13 @@ public class MirrorConnectorsIntegrationBaseTest {
                     Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
                             consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
                     for (int i = 0; i < NUM_PARTITIONS; i++) {
-                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                        TopicPartition tp = new TopicPartition(topicName, i);
+                        if (!offsets.containsKey(tp)) {
                             log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
                             return false;
+                        } else if (lastCheckpoint.containsKey(tp) && lastCheckpoint.get(tp).equals(offsets.get(tp))) {
+                            log.info("Checkpoint is the same as previous checkpoint");
+                            return false;
                         }
                     }
                     ret.set(offsets);
@@ -859,9 +934,12 @@ public class MirrorConnectorsIntegrationBaseTest {
                 for (TopicPartition tp : tps) {
                     assertTrue(consumerGroupOffsets.containsKey(tp),
                             "TopicPartition " + tp + " does not have translated offsets");
-                    assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp) - offsetLagMax,
-                            "TopicPartition " + tp + " does not have fully-translated offsets");
-                    assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                    long offset = consumerGroupOffsets.get(tp).offset();
+                    assertTrue(offset > lastOffset.get(tp) - offsetLagMax,
+                            "TopicPartition " + tp + " does not have fully-translated offsets: "
+                                    + offset + " is not close enough to " + lastOffset.get(tp)
+                                    + " (strictly more than " + (lastOffset.get(tp) - offsetLagMax) + ")");
+                    assertTrue(offset <= endOffsets.get(tp).offset(),
                             "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
                 }
                 return true;
@@ -953,11 +1031,9 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // create these topics before starting the connectors so we don't need to wait for discovery
         primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, topicConfig, adminClientConfig);
-        primary.kafka().createTopic("backup.test-topic-1", 1, 1, emptyMap, adminClientConfig);
         primary.kafka().createTopic("heartbeats", 1, 1, emptyMap, adminClientConfig);
         primary.kafka().createTopic("test-topic-no-checkpoints", 1, 1, emptyMap, adminClientConfig);
         backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, emptyMap, adminClientConfig);
-        backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap, adminClientConfig);
         backup.kafka().createTopic("heartbeats", 1, 1, emptyMap, adminClientConfig);
 
         // This can speed up some test cases


[kafka] 02/03: MINOR: Refactor Mirror integration tests to reduce duplication (#13428)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7127a2257053c4d3b586f8d89e90d2c4bd26633e
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Mar 24 08:18:26 2023 -0700

    MINOR: Refactor Mirror integration tests to reduce duplication (#13428)
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../IdentityReplicationIntegrationTest.java        | 213 +--------------------
 .../MirrorConnectorsIntegrationBaseTest.java       | 183 +++++++++++-------
 2 files changed, 115 insertions(+), 281 deletions(-)

diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index f8489d7a40c..8dc04e60747 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -16,28 +16,12 @@
  */
 package org.apache.kafka.connect.mirror.integration;
 
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.config.TopicConfig;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.connect.mirror.IdentityReplicationPolicy;
-import org.apache.kafka.connect.mirror.MirrorClient;
-import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
-import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
 
 import org.junit.jupiter.api.Tag;
 
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.BeforeEach;
 
 /**
@@ -52,208 +36,13 @@ import org.junit.jupiter.api.BeforeEach;
 public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrationBaseTest {
     @BeforeEach
     public void startClusters() throws Exception {
+        replicateBackupToPrimary = false;
         super.startClusters(new HashMap<String, String>() {{
                 put("replication.policy.class", IdentityReplicationPolicy.class.getName());
                 put("topics", "test-topic-.*");
-                put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-                put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
             }});
     }
 
-    @Test
-    public void testReplication() throws Exception {
-        produceMessages(primary, "test-topic-1");
-        String consumerGroupName = "consumer-group-testReplication";
-        Map<String, Object> consumerProps = new HashMap<String, Object>() {{
-                put("group.id", consumerGroupName);
-                put("auto.offset.reset", "latest");
-            }};
-        // warm up consumers before starting the connectors so we don't need to wait for discovery
-        warmUpConsumer(consumerProps);
-
-        mm2Config = new MirrorMakerConfig(mm2Props);
-
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        waitUntilMirrorMakerIsRunning(primary, Collections.singletonList(MirrorHeartbeatConnector.class), mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
-
-        MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
-        MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
-
-        // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary, "test-topic-1");
-        waitForTopicCreated(backup, "test-topic-1");
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
-                "topic config was not synced");
-        createAndTestNewTopicWithConfigFilter();
-
-        assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
-                "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
-                "Records were not replicated to backup cluster.");
-
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not emitted to primary cluster.");
-        assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not emitted to backup cluster.");
-        assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
-                "Heartbeats were not replicated downstream to backup cluster.");
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
-                "Heartbeats were not replicated downstream to primary cluster.");
-
-        assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
-        assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
-        assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
-                "Checkpoints were not emitted downstream to backup cluster.");
-
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
-                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
-
-        // Failover consumer group to backup cluster.
-        try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
-            primaryConsumer.assign(backupOffsets.keySet());
-            backupOffsets.forEach(primaryConsumer::seek);
-            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            primaryConsumer.commitAsync();
-
-            assertTrue(primaryConsumer.position(new TopicPartition("test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
-            assertTrue(primaryConsumer.position(
-                    new TopicPartition("test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
-        }
-
-        primaryClient.close();
-        backupClient.close();
-
-        // create more matching topics
-        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-
-        // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(backup, "test-topic-2");
-
-        // only produce messages to the first partition
-        produceMessages(primary, "test-topic-2", 1);
-
-        // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
-        assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
-                "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
-                "New topic was not replicated to backup cluster.");
-    }
-
-    @Test
-    public void testReplicationWithEmptyPartition() throws Exception {
-        String consumerGroupName = "consumer-group-testReplicationWithEmptyPartition";
-        Map<String, Object> consumerProps  = Collections.singletonMap("group.id", consumerGroupName);
-
-        // create topic
-        String topic = "test-topic-with-empty-partition";
-        primary.kafka().createTopic(topic, NUM_PARTITIONS);
-
-        // produce to all test-topic-empty's partitions, except the last partition
-        produceMessages(primary, topic, NUM_PARTITIONS - 1);
-
-        // consume before starting the connectors so we don't need to wait for discovery
-        int expectedRecords = NUM_RECORDS_PER_PARTITION * (NUM_PARTITIONS - 1);
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, topic)) {
-            waitForConsumingAllRecords(primaryConsumer, expectedRecords);
-        }
-
-        // one way replication from primary to backup
-        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-        mm2Config = new MirrorMakerConfig(mm2Props);
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
-        // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
-        Thread.sleep(TimeUnit.SECONDS.toMillis(3));
-
-        // note that with IdentityReplicationPolicy, topics on the backup are NOT renamed to PRIMARY_CLUSTER_ALIAS + "." + topic
-        String backupTopic = topic;
-
-        // consume all records from backup cluster
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
-                backupTopic)) {
-            waitForConsumingAllRecords(backupConsumer, expectedRecords);
-        }
-
-        try (Admin backupClient = backup.kafka().createAdminClient()) {
-            // retrieve the consumer group offset from backup cluster
-            Map<TopicPartition, OffsetAndMetadata> remoteOffsets =
-                    backupClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata().get();
-
-            // pinpoint the offset of the last partition which does not receive records
-            OffsetAndMetadata offset = remoteOffsets.get(new TopicPartition(backupTopic, NUM_PARTITIONS - 1));
-            // offset of the last partition should exist, but its value should be 0
-            assertNotNull(offset, "Offset of last partition was not replicated");
-            assertEquals(0, offset.offset(), "Offset of last partition is not zero");
-        }
-    }
-
-    @Override
-    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
-        produceMessages(primary, "test-topic-1");
-        String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
-        Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
-                put("group.id", consumerGroupName);
-                put("auto.offset.reset", "earliest");
-            }};
-        // create consumers before starting the connectors so we don't need to wait for discovery
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps,
-                "test-topic-1")) {
-            // we need to wait for consuming all the records for MM2 replicating the expected offsets
-            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
-        }
-
-        // enable automated consumer group offset sync
-        mm2Props.put("sync.group.offsets.enabled", "true");
-        mm2Props.put("sync.group.offsets.interval.seconds", "1");
-        mm2Props.put("offset.lag.max", Integer.toString(offsetLagMax));
-        // one way replication from primary to backup
-        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
-
-        mm2Config = new MirrorMakerConfig(mm2Props);
-
-        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-
-        // make sure the topic is created in the other cluster
-        waitForTopicCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "test-topic-1");
-        // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-                consumerProps, "test-topic-1")) {
-
-            waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
-            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
-        }
-
-        // now create a new topic in primary cluster
-        primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-        // make sure the topic is created in backup cluster
-        waitForTopicCreated(backup, "test-topic-2");
-
-        // produce some records to the new topic in primary cluster
-        produceMessages(primary, "test-topic-2");
-
-        // create a consumer at primary cluster to consume the new topic
-        try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-2")) {
-            // we need to wait for consuming all the records for MM2 replicating the expected offsets
-            waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
-        }
-
-        // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
-
-            waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
-                    consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
-
-            assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
-        }
-
-        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
-    }
-
     /*
      * Returns expected topic name on target cluster.
      */
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 111dbef06d2..c9b2d54e6dc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -112,7 +112,9 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected MirrorMakerConfig mm2Config;
     protected EmbeddedConnectCluster primary;
     protected EmbeddedConnectCluster backup;
-    
+
+    protected boolean replicateBackupToPrimary = true;
+    protected Boolean createReplicatedTopicsUpfront = false; // enable to speed up the test cases
     protected Exit.Procedure exitProcedure;
     private Exit.Procedure haltProcedure;
     
@@ -125,8 +127,6 @@ public class MirrorConnectorsIntegrationBaseTest {
     public void startClusters() throws Exception {
         startClusters(new HashMap<String, String>() {{
                 put("topics", "test-topic-.*, primary.test-topic-.*, backup.test-topic-.*");
-                put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
-                put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "true");
             }});
     }
 
@@ -163,6 +163,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         backupBrokerProps.put("auto.create.topics.enable", "false");
 
         mm2Props.putAll(basicMM2Config());
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + "->" + BACKUP_CLUSTER_ALIAS + ".enabled", "true");
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", Boolean.toString(replicateBackupToPrimary));
         mm2Props.putAll(additionalMM2Config);
 
         // exclude topic config:
@@ -249,7 +251,11 @@ public class MirrorConnectorsIntegrationBaseTest {
     @Test
     public void testReplication() throws Exception {
         produceMessages(primary, "test-topic-1");
-        produceMessages(backup, "test-topic-1");
+        String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        if (replicateBackupToPrimary) {
+            produceMessages(backup, "test-topic-1");
+        }
+        String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testReplication";
         Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
         // warm up consumers before starting the connectors so we don't need to wait for discovery
@@ -258,51 +264,59 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Config = new MirrorMakerConfig(mm2Props);
 
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
-        waitUntilMirrorMakerIsRunning(primary, CONNECTOR_LIST, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS); 
+        List<Class<? extends Connector>> primaryConnectors = replicateBackupToPrimary ? CONNECTOR_LIST : Collections.singletonList(MirrorHeartbeatConnector.class);
+        waitUntilMirrorMakerIsRunning(primary, primaryConnectors, mm2Config, BACKUP_CLUSTER_ALIAS, PRIMARY_CLUSTER_ALIAS);
 
         MirrorClient primaryClient = new MirrorClient(mm2Config.clientConfig(PRIMARY_CLUSTER_ALIAS));
         MirrorClient backupClient = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS));
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "primary.test-topic-1");
+        waitForTopicCreated(primary, reverseTopic1);
+        waitForTopicCreated(backup, backupTopic1);
         waitForTopicCreated(primary, "mm2-offset-syncs.backup.internal");
-        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), "primary.test-topic-1", TopicConfig.CLEANUP_POLICY_CONFIG),
+        assertEquals(TopicConfig.CLEANUP_POLICY_COMPACT, getTopicConfig(backup.kafka(), backupTopic1, TopicConfig.CLEANUP_POLICY_CONFIG),
                 "topic config was not synced");
         createAndTestNewTopicWithConfigFilter();
 
         assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
             "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1").count(),
+        assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic1).count(),
             "Records were not replicated to backup cluster.");
         assertEquals(NUM_RECORDS_PRODUCED, backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "test-topic-1").count(),
             "Records were not produced to backup cluster.");
-        assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1").count(),
-            "Records were not replicated to primary cluster.");
-        
-        assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "backup.test-topic-1", "test-topic-1").count(),
-            "Primary cluster doesn't have all records from both clusters.");
-        assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, "primary.test-topic-1", "test-topic-1").count(),
-            "Backup cluster doesn't have all records from both clusters.");
-        
+        if (replicateBackupToPrimary) {
+            assertEquals(NUM_RECORDS_PRODUCED, primary.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, reverseTopic1).count(),
+                    "Records were not replicated to primary cluster.");
+            assertEquals(NUM_RECORDS_PRODUCED * 2, primary.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, reverseTopic1, "test-topic-1").count(),
+                "Primary cluster doesn't have all records from both clusters.");
+            assertEquals(NUM_RECORDS_PRODUCED * 2, backup.kafka().consume(NUM_RECORDS_PRODUCED * 2, RECORD_TRANSFER_DURATION_MS, backupTopic1, "test-topic-1").count(),
+                "Backup cluster doesn't have all records from both clusters.");
+        }
+
         assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
             "Heartbeats were not emitted to primary cluster.");
         assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "heartbeats").count() > 0,
             "Heartbeats were not emitted to backup cluster.");
         assertTrue(backup.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "primary.heartbeats").count() > 0,
             "Heartbeats were not replicated downstream to backup cluster.");
-        assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
-            "Heartbeats were not replicated downstream to primary cluster.");
+        if (replicateBackupToPrimary) {
+            assertTrue(primary.kafka().consume(1, RECORD_TRANSFER_DURATION_MS, "backup.heartbeats").count() > 0,
+                    "Heartbeats were not replicated downstream to primary cluster.");
+        }
         
         assertTrue(backupClient.upstreamClusters().contains(PRIMARY_CLUSTER_ALIAS), "Did not find upstream primary cluster.");
         assertEquals(1, backupClient.replicationHops(PRIMARY_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
-        assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
-        assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
             "Checkpoints were not emitted downstream to backup cluster.");
+        if (replicateBackupToPrimary) {
+            assertTrue(primaryClient.upstreamClusters().contains(BACKUP_CLUSTER_ALIAS), "Did not find upstream backup cluster.");
+            assertEquals(1, primaryClient.replicationHops(BACKUP_CLUSTER_ALIAS), "Did not calculate replication hops correctly.");
+            assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
+                    "Checkpoints were not emitted upstream to primary cluster.");
+        }
 
         Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
-                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, backupTopic1);
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -311,55 +325,61 @@ public class MirrorConnectorsIntegrationBaseTest {
             primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
             primaryConsumer.commitAsync();
 
-            assertTrue(primaryConsumer.position(new TopicPartition("primary.test-topic-1", 0)) > 0, "Consumer failedover to zero offset.");
+            assertTrue(primaryConsumer.position(new TopicPartition(backupTopic1, 0)) > 0, "Consumer failedover to zero offset.");
             assertTrue(primaryConsumer.position(
-                new TopicPartition("primary.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
-            assertTrue(primary.kafka().consume(1, CHECKPOINT_DURATION_MS, "backup.checkpoints.internal").count() > 0,
-                "Checkpoints were not emitted upstream to primary cluster.");
+                new TopicPartition(backupTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedover beyond expected offset.");
         }
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
-                primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
-
         assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
  
         primaryClient.close();
         backupClient.close();
-        
-        // Failback consumer group to primary cluster
-        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
-            primaryConsumer.assign(primaryOffsets.keySet());
-            primaryOffsets.forEach(primaryConsumer::seek);
-            primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-            primaryConsumer.commitAsync();
 
-            assertTrue(primaryConsumer.position(new TopicPartition("backup.test-topic-1", 0)) > 0, "Consumer failedback to zero downstream offset.");
-            assertTrue(primaryConsumer.position(
-                new TopicPartition("backup.test-topic-1", 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+        if (replicateBackupToPrimary) {
+            Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+                    primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, reverseTopic1);
+
+            // Failback consumer group to primary cluster
+            try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
+                primaryConsumer.assign(primaryOffsets.keySet());
+                primaryOffsets.forEach(primaryConsumer::seek);
+                primaryConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+                primaryConsumer.commitAsync();
+
+                assertTrue(primaryConsumer.position(new TopicPartition(reverseTopic1, 0)) > 0, "Consumer failedback to zero downstream offset.");
+                assertTrue(primaryConsumer.position(
+                        new TopicPartition(reverseTopic1, 0)) <= NUM_RECORDS_PRODUCED, "Consumer failedback beyond expected downstream offset.");
+            }
+
         }
-      
+
         // create more matching topics
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
-        backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+        String backupTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
 
         // make sure the topic is auto-created in the other cluster
-        waitForTopicCreated(backup, "primary.test-topic-2");
-        waitForTopicCreated(primary, "backup.test-topic-3");
+        waitForTopicCreated(backup, backupTopic2);
 
         // only produce messages to the first partition
         produceMessages(primary, "test-topic-2", 1);
-        produceMessages(backup, "test-topic-3", 1);
-        
+
         // expect total consumed messages equals to NUM_RECORDS_PER_PARTITION
         assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-2").count(),
             "Records were not produced to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
-            "Records were not produced to backup cluster.");
-
-        assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "backup.test-topic-3").count(),
-            "New topic was not replicated to primary cluster.");
-        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, "primary.test-topic-2").count(),
+        assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, backupTopic2).count(),
             "New topic was not replicated to backup cluster.");
+
+        if (replicateBackupToPrimary) {
+            backup.kafka().createTopic("test-topic-3", NUM_PARTITIONS);
+            String reverseTopic3 = remoteTopicName("test-topic-3", BACKUP_CLUSTER_ALIAS);
+            waitForTopicCreated(primary, reverseTopic3);
+            produceMessages(backup, "test-topic-3", 1);
+            assertEquals(NUM_RECORDS_PER_PARTITION, backup.kafka().consume(NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, "test-topic-3").count(),
+                    "Records were not produced to backup cluster.");
+
+            assertEquals(NUM_RECORDS_PER_PARTITION, primary.kafka().consume(NUM_RECORDS_PER_PARTITION, 2 * RECORD_TRANSFER_DURATION_MS, reverseTopic3).count(),
+                    "New topic was not replicated to primary cluster.");
+        }
     }
     
     @Test
@@ -388,7 +408,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         // sleep few seconds to have MM2 finish replication so that "end" consumer will consume some record
         Thread.sleep(TimeUnit.SECONDS.toMillis(3));
 
-        String backupTopic = PRIMARY_CLUSTER_ALIAS + "." + topic;
+        String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
 
         // consume all records from backup cluster
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps,
@@ -419,8 +439,9 @@ public class MirrorConnectorsIntegrationBaseTest {
         testOneWayReplicationWithOffsetSyncs(0);
     }
 
-    public void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
+    private void testOneWayReplicationWithOffsetSyncs(int offsetLagMax) throws InterruptedException {
         produceMessages(primary, "test-topic-1");
+        String backupTopic1 = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
         String consumerGroupName = "consumer-group-testOneWayReplicationWithAutoOffsetSync";
         Map<String, Object> consumerProps  = new HashMap<String, Object>() {{
                 put("group.id", consumerGroupName);
@@ -445,14 +466,17 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
 
-        // make sure the topic is created in the other cluster
-        waitForTopicCreated(primary, "backup.test-topic-1");
-        waitForTopicCreated(backup, "primary.test-topic-1");
+        // make sure the topic is created in the primary cluster only
+        String reverseTopic1 = remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS);
+        if (!"test-topic-1".equals(reverseTopic1)) {
+            topicShouldNotBeCreated(primary, reverseTopic1);
+        }
+        waitForTopicCreated(backup, backupTopic1);
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-            consumerProps, "primary.test-topic-1")) {
+            consumerProps, backupTopic1)) {
 
-            waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+            waitForConsumerGroupFullSync(backup, Collections.singletonList(backupTopic1),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -461,7 +485,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
         // make sure the topic is created in backup cluster
-        waitForTopicCreated(backup, "primary.test-topic-2");
+        String remoteTopic2 = remoteTopicName("test-topic-2", PRIMARY_CLUSTER_ALIAS);
+        waitForTopicCreated(backup, remoteTopic2);
 
         // produce some records to the new topic in primary cluster
         produceMessages(primary, "test-topic-2");
@@ -475,9 +500,9 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
         try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+            "group.id", consumerGroupName), backupTopic1, remoteTopic2)) {
 
-            waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+            waitForConsumerGroupFullSync(backup, Arrays.asList(backupTopic1, remoteTopic2),
                     consumerGroupName, NUM_RECORDS_PRODUCED, offsetLagMax);
 
             assertDownstreamRedeliveriesBoundedByMaxLag(backupConsumer, offsetLagMax);
@@ -688,7 +713,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
         }
@@ -743,7 +768,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * produce messages to the cluster and topic partition less than numPartitions 
      */
-    protected void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
+    private void produceMessages(EmbeddedConnectCluster cluster, String topicName, int numPartitions) {
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
@@ -763,7 +788,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         cluster.produce(topic, partition, key, value);
     }
 
-    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+    private static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
             MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
     ) throws InterruptedException {
         AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
@@ -795,7 +820,7 @@ public class MirrorConnectorsIntegrationBaseTest {
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupFullSync(
+    private static <T> void waitForConsumerGroupFullSync(
             EmbeddedConnectCluster connect,
             List<String> topics,
             String consumerGroupId,
@@ -844,7 +869,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+    private static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
         TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
         try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
                 "auto.offset.reset", "earliest"), checkpointTopic)) {
@@ -870,7 +895,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
-    protected static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
+    private static void assertDownstreamRedeliveriesBoundedByMaxLag(Consumer<byte[], byte[]> targetConsumer, int offsetLagMax) {
         ConsumerRecords<byte[], byte[]> records = targetConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
         // After a full sync, there should be at most offset.lag.max records per partition consumed by both upstream and downstream consumers.
         for (TopicPartition tp : records.partitions()) {
@@ -882,7 +907,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * make sure the consumer to consume expected number of records
      */
-    protected static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
+    private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int numExpectedRecords)
             throws InterruptedException {
         final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
         waitForCondition(() -> {
@@ -895,7 +920,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     /*
      * MM2 config to use in integration tests
      */
-    protected static Map<String, String> basicMM2Config() {
+    private static Map<String, String> basicMM2Config() {
         Map<String, String> mm2Props = new HashMap<>();
         mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
         mm2Props.put("max.tasks", "10");
@@ -934,6 +959,12 @@ public class MirrorConnectorsIntegrationBaseTest {
         backup.kafka().createTopic("test-topic-1", NUM_PARTITIONS, 1, emptyMap, adminClientConfig);
         backup.kafka().createTopic("primary.test-topic-1", 1, 1, emptyMap, adminClientConfig);
         backup.kafka().createTopic("heartbeats", 1, 1, emptyMap, adminClientConfig);
+
+        // This can speed up some test cases
+        if (createReplicatedTopicsUpfront) {
+            primary.kafka().createTopic(remoteTopicName("test-topic-1", BACKUP_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
+            backup.kafka().createTopic(remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS), 1, 1, emptyMap, adminClientConfig);
+        }
     }
 
     /*
@@ -950,6 +981,20 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
+    /*
+     * making sure the topic isn't created on the cluster
+     */
+    private static void topicShouldNotBeCreated(EmbeddedConnectCluster cluster, String topicName) throws InterruptedException {
+        try (final Admin adminClient = cluster.kafka().createAdminClient()) {
+            waitForCondition(() ->
+                    !adminClient.listTopics().names()
+                        .get(REQUEST_TIMEOUT_DURATION_MS, TimeUnit.MILLISECONDS)
+                        .contains(topicName), TOPIC_SYNC_DURATION_MS,
+                "Topic: " + topicName + " get created on cluster: " + cluster.getName()
+            );
+        }
+    }
+
     /*
      * wait for the topic created on the cluster
      */


[kafka] 01/03: KAFKA-14837/14842:Avoid the rebalance caused by the addition and deletion of irrelevant groups for MirrorCheckPointConnector (#13446)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b58ceaf6dd9c6e763055d8313084cec57a6d9db7
Author: hudeqi <12...@qq.com>
AuthorDate: Tue Mar 28 21:19:52 2023 +0800

    KAFKA-14837/14842:Avoid the rebalance caused by the addition and deletion of irrelevant groups for MirrorCheckPointConnector (#13446)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../connect/mirror/MirrorCheckpointConnector.java  | 42 ++++++++++++++++--
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  2 +-
 .../mirror/MirrorCheckpointConnectorTest.java      | 51 +++++++++++++++++++++-
 3 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
index 2e7325f2ff3..e71bef5afe0 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java
@@ -18,6 +18,8 @@ package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Utils;
@@ -30,6 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -46,6 +49,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
 
     private Scheduler scheduler;
     private MirrorConnectorConfig config;
+    private TopicFilter topicFilter;
     private GroupFilter groupFilter;
     private AdminClient sourceAdminClient;
     private SourceAndTarget sourceAndTarget;
@@ -69,6 +73,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
         }
         String connectorName = config.connectorName();
         sourceAndTarget = new SourceAndTarget(config.sourceClusterAlias(), config.targetClusterAlias());
+        topicFilter = config.topicFilter();
         groupFilter = config.groupFilter();
         sourceAdminClient = AdminClient.create(config.sourceAdminConfig());
         scheduler = new Scheduler(MirrorCheckpointConnector.class, config.adminTimeout());
@@ -86,6 +91,7 @@ public class MirrorCheckpointConnector extends SourceConnector {
             return;
         }
         Utils.closeQuietly(scheduler, "scheduler");
+        Utils.closeQuietly(topicFilter, "topic filter");
         Utils.closeQuietly(groupFilter, "group filter");
         Utils.closeQuietly(sourceAdminClient, "source admin client");
     }
@@ -147,10 +153,31 @@ public class MirrorCheckpointConnector extends SourceConnector {
 
     List<String> findConsumerGroups()
             throws InterruptedException, ExecutionException {
-        return listConsumerGroups().stream()
+        List<String> filteredGroups = listConsumerGroups().stream()
                 .map(ConsumerGroupListing::groupId)
-                .filter(this::shouldReplicate)
+                .filter(this::shouldReplicateByGroupFilter)
                 .collect(Collectors.toList());
+
+        List<String> checkpointGroups = new LinkedList<>();
+        List<String> irrelevantGroups = new LinkedList<>();
+
+        for (String group : filteredGroups) {
+            Set<String> consumedTopics = listConsumerGroupOffsets(group).keySet().stream()
+                    .map(TopicPartition::topic)
+                    .filter(this::shouldReplicateByTopicFilter)
+                    .collect(Collectors.toSet());
+            // Only perform checkpoints for groups that have offsets for at least one topic that's accepted
+            // by the topic filter.
+            if (consumedTopics.size() > 0) {
+                checkpointGroups.add(group);
+            } else {
+                irrelevantGroups.add(group);
+            }
+        }
+
+        log.debug("Ignoring the following groups which do not have any offsets for topics that are accepted by " +
+                        "the topic filter: {}", irrelevantGroups);
+        return checkpointGroups;
     }
 
     Collection<ConsumerGroupListing> listConsumerGroups()
@@ -163,7 +190,16 @@ public class MirrorCheckpointConnector extends SourceConnector {
             config.checkpointsTopicReplicationFactor(), config.targetAdminConfig());
     } 
 
-    boolean shouldReplicate(String group) {
+    Map<TopicPartition, OffsetAndMetadata> listConsumerGroupOffsets(String group)
+            throws InterruptedException, ExecutionException {
+        return sourceAdminClient.listConsumerGroupOffsets(group).partitionsToOffsetAndMetadata().get();
+    }
+
+    boolean shouldReplicateByGroupFilter(String group) {
         return groupFilter.shouldReplicateGroup(group);
     }
+
+    boolean shouldReplicateByTopicFilter(String topic) {
+        return topicFilter.shouldReplicateTopic(topic);
+    }
 }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 02dcdf0f43e..1ed1e9f0b7d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -177,7 +177,7 @@ public class MirrorCheckpointTask extends SourceTask {
 
     private List<Checkpoint> checkpointsForGroup(String group) throws ExecutionException, InterruptedException {
         return listConsumerGroupOffsets(group).entrySet().stream()
-            .filter(x -> shouldCheckpointTopic(x.getKey().topic()))
+            .filter(x -> shouldCheckpointTopic(x.getKey().topic())) // Only perform relevant checkpoints filtered by "topic filter"
             .map(x -> checkpoint(group, x.getKey(), x.getValue()))
             .flatMap(o -> o.map(Stream::of).orElseGet(Stream::empty)) // do not emit checkpoints for partitions that don't have offset-syncs
             .filter(x -> x.downstreamOffset() >= 0)  // ignore offsets we cannot translate accurately
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
index 1391e7615d3..7220f559efc 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnectorTest.java
@@ -17,12 +17,15 @@
 package org.apache.kafka.connect.mirror;
 
 import org.apache.kafka.clients.admin.ConsumerGroupListing;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -123,13 +126,59 @@ public class MirrorCheckpointConnectorTest {
         Collection<ConsumerGroupListing> groups = Arrays.asList(
                 new ConsumerGroupListing("g1", true),
                 new ConsumerGroupListing("g2", false));
+        Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+        offsets.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
         doReturn(groups).when(connector).listConsumerGroups();
-        doReturn(true).when(connector).shouldReplicate(anyString());
+        doReturn(true).when(connector).shouldReplicateByTopicFilter(anyString());
+        doReturn(true).when(connector).shouldReplicateByGroupFilter(anyString());
+        doReturn(offsets).when(connector).listConsumerGroupOffsets(anyString());
         List<String> groupFound = connector.findConsumerGroups();
 
         Set<String> expectedGroups = groups.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toSet());
         assertEquals(expectedGroups, new HashSet<>(groupFound),
                 "Expected groups are not the same as findConsumerGroups");
+
+        doReturn(false).when(connector).shouldReplicateByTopicFilter(anyString());
+        List<String> topicFilterGroupFound = connector.findConsumerGroups();
+        assertEquals(Collections.emptyList(), topicFilterGroupFound);
+    }
+
+    @Test
+    public void testFindConsumerGroupsInCommonScenarios() throws Exception {
+        MirrorConnectorConfig config = new MirrorConnectorConfig(makeProps());
+        MirrorCheckpointConnector connector = new MirrorCheckpointConnector(Collections.emptyList(), config);
+        connector = spy(connector);
+
+        Collection<ConsumerGroupListing> groups = Arrays.asList(
+                new ConsumerGroupListing("g1", true),
+                new ConsumerGroupListing("g2", false),
+                new ConsumerGroupListing("g3", false),
+                new ConsumerGroupListing("g4", false));
+        Map<TopicPartition, OffsetAndMetadata> offsetsForGroup1 = new HashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> offsetsForGroup2 = new HashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> offsetsForGroup3 = new HashMap<>();
+        Map<TopicPartition, OffsetAndMetadata> offsetsForGroup4 = new HashMap<>();
+        offsetsForGroup1.put(new TopicPartition("t1", 0), new OffsetAndMetadata(0));
+        offsetsForGroup1.put(new TopicPartition("t2", 0), new OffsetAndMetadata(0));
+        offsetsForGroup2.put(new TopicPartition("t2", 0), new OffsetAndMetadata(0));
+        offsetsForGroup2.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
+        offsetsForGroup3.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
+        offsetsForGroup4.put(new TopicPartition("t3", 0), new OffsetAndMetadata(0));
+        doReturn(groups).when(connector).listConsumerGroups();
+        doReturn(false).when(connector).shouldReplicateByTopicFilter("t1");
+        doReturn(true).when(connector).shouldReplicateByTopicFilter("t2");
+        doReturn(false).when(connector).shouldReplicateByTopicFilter("t3");
+        doReturn(true).when(connector).shouldReplicateByGroupFilter("g1");
+        doReturn(true).when(connector).shouldReplicateByGroupFilter("g2");
+        doReturn(true).when(connector).shouldReplicateByGroupFilter("g3");
+        doReturn(false).when(connector).shouldReplicateByGroupFilter("g4");
+        doReturn(offsetsForGroup1).when(connector).listConsumerGroupOffsets("g1");
+        doReturn(offsetsForGroup2).when(connector).listConsumerGroupOffsets("g2");
+        doReturn(offsetsForGroup3).when(connector).listConsumerGroupOffsets("g3");
+        doReturn(offsetsForGroup4).when(connector).listConsumerGroupOffsets("g4");
+
+        List<String> groupFound = connector.findConsumerGroups();
+        assertEquals(groupFound, Arrays.asList("g1", "g2"));
     }
 
 }