You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/17 22:25:43 UTC

[kafka] branch trunk updated: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a54a34a11c1 KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)
a54a34a11c1 is described below

commit a54a34a11c1c867ff62a7234334cad5139547fd7
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Feb 17 14:25:17 2023 -0800

    KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)
    
    KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2
    
    KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2
    
    KAFKA-12566: Fix flaky MirrorMaker 2 integration tests
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../connect/mirror/MirrorCheckpointConfig.java     |   6 +
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  17 +-
 .../kafka/connect/mirror/OffsetSyncStore.java      | 119 +++++++---
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  32 ++-
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  |  86 +++++--
 .../IdentityReplicationIntegrationTest.java        |  47 ++--
 .../MirrorConnectorsIntegrationBaseTest.java       | 263 ++++++++++++++++-----
 ...MirrorConnectorsIntegrationExactlyOnceTest.java |   3 -
 ...irrorConnectorsIntegrationTransactionsTest.java |  66 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  24 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |   3 +-
 11 files changed, 507 insertions(+), 159 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
index e21d22af1d4..122d8ad1e7f 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConfig.java
@@ -150,6 +150,12 @@ public class MirrorCheckpointConfig extends MirrorConnectorConfig {
                 : targetConsumerConfig();
     }
 
+    Map<String, Object> offsetSyncsTopicAdminConfig() {
+        return SOURCE_CLUSTER_ALIAS_DEFAULT.equals(offsetSyncsTopicLocation())
+                ? sourceAdminConfig()
+                : targetAdminConfig();
+    }
+
     Duration consumerPollTimeout() {
         return Duration.ofMillis(getLong(CONSUMER_POLL_TIMEOUT_MILLIS));
     }
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 95b7b5a2bda..9f5a4b00e69 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -102,10 +102,13 @@ public class MirrorCheckpointTask extends SourceTask {
         idleConsumerGroupsOffset = new HashMap<>();
         checkpointsPerConsumerGroup = new HashMap<>();
         scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
-        scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
-                                    "refreshing idle consumers group offsets at target cluster");
-        scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
-                                          "sync idle consumer group offset from source to target");
+        scheduler.execute(() -> {
+            offsetSyncStore.start();
+            scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
+                    "refreshing idle consumers group offsets at target cluster");
+            scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
+                    "sync idle consumer group offset from source to target");
+        }, "starting offset sync store");
     }
 
     @Override
@@ -136,7 +139,11 @@ public class MirrorCheckpointTask extends SourceTask {
         try {
             long deadline = System.currentTimeMillis() + interval.toMillis();
             while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+                Thread.sleep(pollTimeout.toMillis());
+            }
+            if (stopping) {
+                // we are stopping, return early.
+                return null;
             }
             List<SourceRecord> records = new ArrayList<>();
             for (String group : consumerGroups) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index 1cfdb1d265c..0169446aa04 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -16,65 +16,124 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    protected volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorCheckpointConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+            admin = new TopicAdmin(
+                    config.offsetSyncsTopicAdminConfig().get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.forwardingAdmin(config.offsetSyncsTopicAdminConfig()));
+            store = createBackingStore(config, consumer, admin);
+        } catch (Throwable t) {
+            Utils.closeQuietly(consumer, "consumer for offset syncs");
+            Utils.closeQuietly(admin, "admin client for offset syncs");
+            throw t;
+        }
+        this.admin = admin;
+        this.backingStore = store;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorCheckpointConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
+        return new KafkaBasedLog<byte[], byte[]>(
+                config.offsetSyncsTopic(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                () -> admin,
+                (error, record) -> this.handleRecord(record),
+                Time.SYSTEM,
+                ignored -> {
+                }
+        ) {
+            @Override
+            protected Producer<byte[], byte[]> createProducer() {
+                return null;
+            }
+
+            @Override
+            protected Consumer<byte[], byte[]> createConsumer() {
+                return consumer;
+            }
+
+            @Override
+            protected boolean readPartition(TopicPartition topicPartition) {
+                return topicPartition.partition() == 0;
+            }
+        };
+    }
+
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
+    }
+
+    /**
+     * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
+     */
+    public void start() {
+        backingStore.start();
+        readToEnd = true;
     }
 
     OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+        if (!readToEnd) {
+            // If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
+            // This prevents emitting stale offsets while initially reading the offset syncs topic.
+            return OptionalLong.empty();
+        }
         Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
         if (offsetSync.isPresent()) {
             if (offsetSync.get().upstreamOffset() > upstreamOffset) {
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
+            // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
+            // downstream offset past the offset sync itself. This is because we know that future records must appear
+            // ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
+            // will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
+            // This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
+            // This may cause re-reading of records depending on the age of the offset sync.
+            // s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
+            // source |-s?????r???g-|
+            //          |  ______/
+            //          | /
+            //          vv
+            // target |-sg----r-----|
+            long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
             return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
         } else {
             return OptionalLong.empty();
         }
     }
 
-    // poll and handle records
-    synchronized void update(Duration pollTimeout) {
-        try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
-            // swallow
-        }
-    }
-
-    public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+    @Override
+    public void close() {
+        Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
+        Utils.closeQuietly(admin, "admin client for offset syncs");
     }
 
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 20735cd2334..500cb6c131a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -52,11 +52,16 @@ public class MirrorCheckpointTaskTest {
 
     @Test
     public void testCheckpoint() {
+        long t1UpstreamOffset = 3L;
+        long t1DownstreamOffset = 4L;
+        long t2UpstreamOffset = 7L;
+        long t2DownstreamOffset = 8L;
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
-        offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
-        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+        offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
+        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
         Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
             new OffsetAndMetadata(10, null));
         assertTrue(optionalCheckpoint1.isPresent());
@@ -70,7 +75,7 @@ public class MirrorCheckpointTaskTest {
                 "checkpoint group9 sourcePartition failed");
         assertEquals(10, checkpoint1.upstreamOffset(),
                 "checkpoint group9 upstreamOffset failed");
-        assertEquals(11, checkpoint1.downstreamOffset(),
+        assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
                 "checkpoint group9 downstreamOffset failed");
         assertEquals(123L, sourceRecord1.timestamp().longValue(),
                 "checkpoint group9 timestamp failed");
@@ -87,10 +92,27 @@ public class MirrorCheckpointTaskTest {
                 "checkpoint group11 sourcePartition failed");
         assertEquals(12, checkpoint2.upstreamOffset(),
                 "checkpoint group11 upstreamOffset failed");
-        assertEquals(13, checkpoint2.downstreamOffset(),
+        assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
                 "checkpoint group11 downstreamOffset failed");
         assertEquals(234L, sourceRecord2.timestamp().longValue(),
                     "checkpoint group11 timestamp failed");
+        Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
+                new OffsetAndMetadata(7, null));
+        assertTrue(optionalCheckpoint3.isPresent());
+        Checkpoint checkpoint3 = optionalCheckpoint3.get();
+        SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
+        assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
+                "checkpoint group13 topic5 failed");
+        assertEquals("group13", checkpoint3.consumerGroupId(),
+                "checkpoint group13 consumerGroupId failed");
+        assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
+                "checkpoint group13 sourcePartition failed");
+        assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
+                "checkpoint group13 upstreamOffset failed");
+        assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
+                "checkpoint group13 downstreamOffset failed");
+        assertEquals(234L, sourceRecord3.timestamp().longValue(),
+                "checkpoint group13 timestamp failed");
     }
 
     @Test
@@ -150,6 +172,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testNoCheckpointForTopicWithoutOffsetSyncs() {
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
                 new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
         offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
@@ -165,6 +188,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
         offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9224a088081..163e5b72250 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.OptionalLong;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class OffsetSyncStoreTest {
@@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
     static class FakeOffsetSyncStore extends OffsetSyncStore {
 
         FakeOffsetSyncStore() {
-            super(null, null);
+            super();
+        }
+
+        @Override
+        public void start() {
+            // do not call super to avoid NPE without a KafkaBasedLog.
+            readToEnd = true;
         }
 
         void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
@@ -44,29 +52,57 @@ public class OffsetSyncStoreTest {
 
     @Test
     public void testOffsetTranslation() {
-        FakeOffsetSyncStore store = new FakeOffsetSyncStore();
-
-        store.sync(tp, 100, 200);
-        assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
-                "Failure in translating downstream offset 250");
-
-        // Translate exact offsets
-        store.sync(tp, 150, 251);
-        assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
-                "Failure in translating exact downstream offset 251");
-
-        // Use old offset (5) prior to any sync -> can't translate
-        assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
-                "Expected old offset to not translate");
-
-        // Downstream offsets reset
-        store.sync(tp, 200, 10);
-        assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
-                "Failure in resetting translation of downstream offset");
-
-        // Upstream offsets reset
-        store.sync(tp, 20, 20);
-        assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
-                "Failure in resetting translation of upstream offset");
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+
+            // Emit synced downstream offset without dead-reckoning
+            store.sync(tp, 100, 200);
+            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+
+            // Translate exact offsets
+            store.sync(tp, 150, 251);
+            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+
+            // Use old offset (5) prior to any sync -> can't translate
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+
+            // Downstream offsets reset
+            store.sync(tp, 200, 10);
+            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+
+            // Upstream offsets reset
+            store.sync(tp, 20, 20);
+            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+        }
+    }
+
+    @Test
+    public void testNoTranslationIfStoreNotStarted() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            // no offsets exist and store is not started
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+            // read a sync during startup
+            store.sync(tp, 100, 200);
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+            // After the store is started all offsets are visible
+            store.start();
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+        }
+    }
+
+    @Test
+    public void testNoTranslationIfNoOffsetSync() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+        }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index dd683f1acfe..17aa9ebc142 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.mirror.MirrorClient;
 import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
 import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -107,11 +106,8 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
                 "Checkpoints were not emitted downstream to backup cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
-                Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
-        assertTrue(backupOffsets.containsKey(
-                new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -221,18 +217,19 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         topicShouldNotBeCreated(primary, "backup.test-topic-1");
         waitForTopicCreated(backup, "test-topic-1");
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-                consumerProps, "test-topic-1");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+                consumerProps, "test-topic-1")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
-                consumerGroupName, NUM_RECORDS_PRODUCED, true);
+            waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
-        // the size of consumer record should be zero, because the offsets of the same consumer group
-        // have been automatically synchronized from primary to backup by the background job, so no
-        // more records to consume from the replicated topic by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
+            // the size of consumer record should be zero, because the offsets of the same consumer group
+            // have been automatically synchronized from primary to backup by the background job, so no
+            // more records to consume from the replicated topic by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -244,22 +241,24 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {
+                "group.id", consumerGroupName), "test-topic-2")) {
             // we need to wait for consuming all the records for MM2 replicating the expected offsets
             waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-1", "test-topic-2");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
-                consumerGroupName, NUM_RECORDS_PRODUCED, true);
+            waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
+
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
-        records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
-        backupConsumer.close();
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
     /*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 27da7054b67..82772ceb06e 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,6 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
 import org.apache.kafka.connect.mirror.SourceAndTarget;
 import org.apache.kafka.connect.mirror.Checkpoint;
 import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
-import org.apache.kafka.connect.mirror.ReplicationPolicy;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -54,6 +55,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Tag;
@@ -93,6 +96,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     private static final int OFFSET_SYNC_DURATION_MS = 30_000;
     private static final int TOPIC_SYNC_DURATION_MS = 60_000;
     private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
+    private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
     private static final int NUM_WORKERS = 3;
     protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
     protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
@@ -118,8 +122,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected Properties backupBrokerProps = new Properties();
     protected Map<String, String> primaryWorkerProps = new HashMap<>();
     protected Map<String, String> backupWorkerProps = new HashMap<>();
-    protected boolean exactOffsetTranslation = true;
-    
+
     @BeforeEach
     public void startClusters() throws Exception {
         startClusters(new HashMap<String, String>() {{
@@ -303,13 +306,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
             "Checkpoints were not emitted downstream to backup cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
-        for (int i = 0; i < NUM_PARTITIONS; i++) {
-            assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
-                   "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
-        }
+        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -325,11 +323,10 @@ public class MirrorConnectorsIntegrationBaseTest {
                 "Checkpoints were not emitted upstream to primary cluster.");
         }
 
-        waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+                primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-                Duration.ofMillis(CHECKPOINT_DURATION_MS));
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
  
         primaryClient.close();
         backupClient.close();
@@ -416,7 +413,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             assertEquals(0, offset.offset(), "Offset of last partition is not zero");
         }
     }
-    
+
     @Test
     public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
         produceMessages(primary, "test-topic-1");
@@ -447,18 +444,19 @@ public class MirrorConnectorsIntegrationBaseTest {
         topicShouldNotBeCreated(primary, "backup.test-topic-1");
         waitForTopicCreated(backup, "primary.test-topic-1");
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-            consumerProps, "primary.test-topic-1");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+            consumerProps, "primary.test-topic-1")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
+            waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
-        // the size of consumer record should be zero, because the offsets of the same consumer group
-        // have been automatically synchronized from primary to backup by the background job, so no
-        // more records to consume from the replicated topic by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
+            // the size of consumer record should be zero, because the offsets of the same consumer group
+            // have been automatically synchronized from primary to backup by the background job, so no
+            // more records to consume from the replicated topic by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -470,22 +468,24 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {
+                "group.id", consumerGroupName), "test-topic-2")) {
             // we need to wait for consuming all the records for MM2 replicating the expected offsets
             waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+
+            waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED, exactOffsetTranslation);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
-        records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
-        backupConsumer.close();
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
     }
 
     @Test
@@ -502,10 +502,14 @@ public class MirrorConnectorsIntegrationBaseTest {
         // Ensure the offset syncs topic is created in the target cluster
         waitForTopicCreated(backup, "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal");
 
+        String consumerGroupName = "consumer-group-syncs-on-target";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
+        warmUpConsumer(consumerProps);
+
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
 
         // Check offsets are pushed to the checkpoint topic
         Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -523,6 +527,8 @@ public class MirrorConnectorsIntegrationBaseTest {
             "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
         );
 
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
         // Ensure no offset-syncs topics have been created on the primary cluster
         Set<String> primaryTopics = primary.kafka().createAdminClient().listTopics().names().get();
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"));
@@ -575,6 +581,17 @@ public class MirrorConnectorsIntegrationBaseTest {
         // Send some records to test-topic-no-checkpoints in the source cluster
         produceMessages(primary, "test-topic-no-checkpoints");
 
+        try (Consumer<byte[], byte[]> consumer = primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream()
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            e -> new OffsetAndMetadata(e.getValue())
+                    ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
         waitForCondition(() -> {
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
@@ -583,6 +600,33 @@ public class MirrorConnectorsIntegrationBaseTest {
         }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to backup cluster");
     }
 
+    @Test
+    public void testRestartReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-restart";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        warmUpConsumer(consumerProps);
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        produceMessages(primary, "test-topic-1");
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+        }
+        waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED);
+        restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+        Thread.sleep(5000);
+        produceMessages(primary, "test-topic-1");
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+        }
+        waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED);
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+    }
+
+
     private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
     }
@@ -645,6 +689,12 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+        for (Class<? extends Connector> connector : connectorClasses) {
+            connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
+        }
+    }
+
     /*
      * wait for the topic created on the cluster
      */
@@ -690,7 +740,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected void produceMessages(EmbeddedConnectCluster cluster, String topicName) {
         Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
         for (Map.Entry<String, String> entry : recordSent.entrySet()) {
-            cluster.kafka().produce(topicName, entry.getKey(), entry.getValue());
+            produce(cluster.kafka(), topicName, null, entry.getKey(), entry.getValue());
         }
     }
 
@@ -701,16 +751,70 @@ public class MirrorConnectorsIntegrationBaseTest {
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords, boolean exactOffsetTranslation)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords
+    ) throws InterruptedException {
+        int expectedRecords = numRecords * topics.size();
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.put("isolation.level", "read_committed");
+        consumerProps.put("auto.offset.reset", "earliest");
+        Map<TopicPartition, Long> lastOffset = new HashMap<>();
+        try (Consumer<byte[], byte[]> consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]))) {
+            final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+            waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+                records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()));
+                return expectedRecords == totalConsumedRecords.addAndGet(records.count());
+            }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time");
+        }
         try (Admin adminClient = connect.kafka().createAdminClient()) {
             List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
@@ -718,26 +822,54 @@ public class MirrorConnectorsIntegrationBaseTest {
                     tps.add(new TopicPartition(topic, partitionIndex));
                 }
             }
-            long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
-                    adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
-                    .mapToLong(OffsetAndMetadata::offset).sum();
-
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
-                boolean totalOffsetsMatch = exactOffsetTranslation
-                        ? totalOffsets == expectedTotalOffsets
-                        : totalOffsets >= expectedTotalOffsets;
-                // make sure the consumer group offsets are synced to expected number
-                return totalOffsetsMatch && consumerGroupOffsetTotal > 0;
+                        adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
+                Map<TopicPartition, OffsetSpec> endOffsetRequest = tps.stream()
+                        .collect(Collectors.toMap(Function.identity(), ignored -> OffsetSpec.latest()));
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+                        adminClient.listOffsets(endOffsetRequest).all().get();
+
+                for (TopicPartition tp : tps) {
+                    assertTrue(consumerGroupOffsets.containsKey(tp),
+                            "TopicPartition " + tp + " does not have translated offsets");
+                    assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp),
+                            "TopicPartition " + tp + " does not have fully-translated offsets");
+                    assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                            "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
+                }
+                return true;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
 
+    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+        TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
+        try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "auto.offset.reset", "earliest"), checkpointTopic)) {
+            Map<String, Map<TopicPartition, Checkpoint>> checkpointsByGroup = new HashMap<>();
+            long deadline = System.currentTimeMillis() + CHECKPOINT_DURATION_MS;
+            do {
+                ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+                    Map<TopicPartition, Checkpoint> lastCheckpoints = checkpointsByGroup.computeIfAbsent(
+                            checkpoint.consumerGroupId(),
+                            ignored -> new HashMap<>());
+                    Checkpoint lastCheckpoint = lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint);
+                    assertTrue(checkpoint.downstreamOffset() >= lastCheckpoint.downstreamOffset(),
+                            "Checkpoint was non-monotonic for "
+                                    + checkpoint.consumerGroupId()
+                                    + ": "
+                                    + checkpoint.topicPartition());
+                    lastCheckpoints.put(checkpoint.topicPartition(), checkpoint);
+                }
+            } while (backupConsumer.currentLag(checkpointTopicPartition).orElse(1) > 0 && System.currentTimeMillis() < deadline);
+            assertEquals(0, backupConsumer.currentLag(checkpointTopicPartition).orElse(1), "Unable to read all checkpoints within " + CHECKPOINT_DURATION_MS + "ms");
+        }
+    }
+
     /*
      * make sure the consumer to consume expected number of records
      */
@@ -760,7 +892,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("max.tasks", "10");
         mm2Props.put("groups", "consumer-group-.*");
         mm2Props.put("sync.topic.acls.enabled", "false");
-        mm2Props.put("emit.checkpoints.interval.seconds", "1");
+        mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
         mm2Props.put("emit.heartbeats.interval.seconds", "1");
         mm2Props.put("refresh.topics.interval.seconds", "1");
         mm2Props.put("refresh.groups.interval.seconds", "1");
@@ -771,7 +903,10 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("offset.storage.replication.factor", "1");
         mm2Props.put("status.storage.replication.factor", "1");
         mm2Props.put("replication.factor", "1");
-        
+        // Sync offsets as soon as possible to ensure the final record in a finite test has its offset translated.
+        mm2Props.put("offset.lag.max", "0");
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
         return mm2Props;
     }
     
@@ -802,14 +937,14 @@ public class MirrorConnectorsIntegrationBaseTest {
      * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
      */
     protected void warmUpConsumer(Map<String, Object> consumerProps) {
-        Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
-        dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        dummyConsumer.commitSync();
-        dummyConsumer.close();
-        dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
-        dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        dummyConsumer.commitSync();
-        dummyConsumer.close();
+        try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            dummyConsumer.commitSync();
+        }
+        try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            dummyConsumer.commitSync();
+        }
     }
 
     /*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
index a50b21bd58b..0081dcb3688 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationExactlyOnceTest.java
@@ -43,9 +43,6 @@ public class MirrorConnectorsIntegrationExactlyOnceTest extends MirrorConnectors
             brokerProps.put("transaction.state.log.replication.factor", "1");
             brokerProps.put("transaction.state.log.min.isr", "1");
         }
-        // Transaction marker records will cause translated offsets to not match
-        // between source and target
-        exactOffsetTranslation = false;
         super.startClusters();
     }
 
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
new file mode 100644
index 00000000000..6ac09ef32bb
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer,
+ * which interleaves transaction commit messages into the source topic which are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest {
+
+    private Map<String, Object> producerProps = new HashMap<>();
+
+    @BeforeEach
+    @Override
+    public void startClusters() throws Exception {
+        primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
+        backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+        primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+        backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0");
+        super.startClusters();
+    }
+
+    /**
+     * Produce records with a short-lived transactional producer to interleave transaction markers in the topic.
+     */
+    @Override
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
+        try (Producer<byte[], byte[]> producer = cluster.createProducer(producerProps)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+            producer.commitTransaction();
+        } catch (Exception e) {
+            throw new KafkaException("Could not produce message: " + msg, e);
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 50c0e6936a1..e6d820539ea 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -257,8 +257,16 @@ public class KafkaBasedLog<K, V> {
                     " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
                     " this is your first use of the topic it may have taken too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        for (PartitionInfo partition : partitionInfos) {
+            TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition());
+            if (readPartition(topicPartition)) {
+                partitions.add(topicPartition);
+            }
+        }
+        if (partitions.isEmpty()) {
+            throw new ConnectException("Some partitions for " + topic + " exist, but no partitions matched the " +
+                    "required filter.");
+        }
         partitionCount = partitions.size();
         consumer.assign(partitions);
 
@@ -392,6 +400,18 @@ public class KafkaBasedLog<K, V> {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    /**
+     * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once
+     * for every partition found in the log's backing topic.
+     * <p>This method can be overridden by subclasses when only a subset of the assigned partitions
+     * should be read into memory. By default, all partitions are read.
+     * @param topicPartition A topic partition which could be read by this log.
+     * @return true if the partition should be read by this log, false if its contents should be ignored.
+     */
+    protected boolean readPartition(TopicPartition topicPartition) {
+        return true;
+    }
+
     private void poll(long timeoutMs) {
         try {
             ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index f9defc77ca2..691852430a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -282,8 +282,7 @@ public class TopicAdmin implements AutoCloseable {
         this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig));
     }
 
-    // visible for testing
-    TopicAdmin(Object bootstrapServers, Admin adminClient) {
+    public TopicAdmin(Object bootstrapServers, Admin adminClient) {
         this(bootstrapServers, adminClient, true);
     }