You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 14:26:12 UTC

[kafka] branch 3.3 updated (cec628a2e70 -> 69a2817a059)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a change to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from cec628a2e70 KAFKA-14731: Upgrade ZooKeeper to 3.6.4 (#13273)
     new 7c78f3f68e8 KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)
     new 4597e954d91 KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)
     new 51ee89a598b KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)
     new 69a2817a059 KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  31 ++-
 .../kafka/connect/mirror/MirrorSourceTask.java     | 109 ++++++---
 .../kafka/connect/mirror/OffsetSyncStore.java      | 116 +++++++---
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  42 +++-
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 194 +++++++++++++++-
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  |  86 +++++--
 .../IdentityReplicationIntegrationTest.java        |  45 ++--
 .../MirrorConnectorsIntegrationBaseTest.java       | 257 ++++++++++++++++-----
 ...irrorConnectorsIntegrationTransactionsTest.java |  66 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  24 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |   3 +-
 11 files changed, 781 insertions(+), 192 deletions(-)
 create mode 100644 connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java


[kafka] 01/04: KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7c78f3f68e83eb9cef3b147951f9108c4bba8b80
Author: emilnkrastev <em...@gmail.com>
AuthorDate: Tue Jan 10 16:46:25 2023 +0200

    KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     |  23 ++-
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 173 ++++++++++++++++++++-
 .../MirrorConnectorsIntegrationBaseTest.java       |   6 +-
 3 files changed, 192 insertions(+), 10 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index ec1e15cda7e..4e714a6bf46 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,13 +69,19 @@ public class MirrorSourceTask extends SourceTask {
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorMetrics metrics, String sourceClusterAlias,
-                     ReplicationPolicy replicationPolicy, long maxOffsetLag) {
+                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
+                     Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,
+                     String offsetSyncsTopic) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
         this.replicationPolicy = replicationPolicy;
         this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
+        this.offsetProducer = producer;
+        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+        this.partitionStates = partitionStates;
+        this.offsetSyncsTopic = offsetSyncsTopic;
     }
 
     @Override
@@ -197,16 +203,18 @@ public class MirrorSourceTask extends SourceTask {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
+                partitionState.reset();
+            }
         }
     }
 
     // sends OffsetSync record upstream to internal offsets topic
-    private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
+    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
             long downstreamOffset) {
         if (!outstandingOffsetSyncs.tryAcquire()) {
             // Too many outstanding offset syncs.
-            return;
+            return false;
         }
         OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
@@ -220,6 +228,7 @@ public class MirrorSourceTask extends SourceTask {
             }
             outstandingOffsetSyncs.release();
         });
+        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
@@ -271,6 +280,7 @@ public class MirrorSourceTask extends SourceTask {
         long lastSyncUpstreamOffset = -1L;
         long lastSyncDownstreamOffset = -1L;
         long maxOffsetLag;
+        boolean shouldSyncOffsets;
 
         PartitionState(long maxOffsetLag) {
             this.maxOffsetLag = maxOffsetLag;
@@ -278,7 +288,6 @@ public class MirrorSourceTask extends SourceTask {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            boolean shouldSyncOffsets = false;
             long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
             long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
             if (lastSyncDownstreamOffset == -1L
@@ -293,5 +302,9 @@ public class MirrorSourceTask extends SourceTask {
             previousDownstreamOffset = downstreamOffset;
             return shouldSyncOffsets;
         }
+
+        void reset() {
+            shouldSyncOffsets = false;
+        }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index feb2f7fb6ba..bbd9ec3aff5 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,25 +19,38 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 public class MirrorSourceTaskTest {
@@ -51,8 +64,10 @@ public class MirrorSourceTaskTest {
         headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
         ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
             TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
-                new DefaultReplicationPolicy(), 50);
+                new DefaultReplicationPolicy(), 50, producer, null, null, null);
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
         assertEquals("cluster7.topic1", sourceRecord.topic(),
                 "Failure on cluster7.topic1 consumerRecord serde");
@@ -77,15 +92,33 @@ public class MirrorSourceTaskTest {
         MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
 
         assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
         assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 152), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(4, 153), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 154), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 205), "one past target offset");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 206), "upstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 207), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "downstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 4), "no sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(8, 7), "sync");
+        assertTrue(partitionState.update(10, 57), "sync");
+        partitionState.reset();
+        assertFalse(partitionState.update(11, 58), "sync");
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
     }
 
     @Test
@@ -94,15 +127,32 @@ public class MirrorSourceTaskTest {
 
         // if max offset lag is zero, should always emit offset syncs
         assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
         assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect");
+        assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect");
+        partitionState.reset();
+        assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect");
     }
 
     @Test
@@ -127,6 +177,8 @@ public class MirrorSourceTaskTest {
 
         @SuppressWarnings("unchecked")
         KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         when(consumer.poll(any())).thenReturn(consumerRecords);
 
         MirrorMetrics metrics = mock(MirrorMetrics.class);
@@ -134,7 +186,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
-                replicationPolicy, 50);
+                replicationPolicy, 50, producer, null, null, null);
         List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
 
         assertEquals(2, sourceRecords.size());
@@ -160,6 +212,121 @@ public class MirrorSourceTaskTest {
         }
     }
 
+    @Test
+    public void testCommitRecordWithNullMetadata() {
+        // Create a consumer mock
+        byte[] key1 = "abc".getBytes();
+        byte[] value1 = "fgh".getBytes();
+        String topicName = "test";
+        String headerKey = "key";
+        RecordHeaders headers = new RecordHeaders(new Header[] {
+            new RecordHeader(headerKey, "value".getBytes()),
+        });
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+        String sourceClusterName = "cluster1";
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+                replicationPolicy, 50, producer, null, null, null);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
+                TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
+
+        // Expect that commitRecord will not throw an exception
+        mirrorSourceTask.commitRecord(sourceRecord, null);
+        verifyNoInteractions(producer);
+    }
+
+    @Test
+    public void testSendSyncEvent() {
+        byte[] recordKey = "key".getBytes();
+        byte[] recordValue = "value".getBytes();
+        int maxOffsetLag = 50;
+        int recordPartition = 0;
+        int recordOffset = 0;
+        int metadataOffset = 100;
+        String topicName = "topic";
+        String sourceClusterName = "sourceCluster";
+
+        RecordHeaders headers = new RecordHeaders();
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        MirrorMetrics metrics = mock(MirrorMetrics.class);
+        Semaphore outstandingOffsetSyncs = new Semaphore(1);
+        PartitionState partitionState = new PartitionState(maxOffsetLag);
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+                replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+        partitionStates.put(sourceTopicPartition, partitionState);
+        RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+        ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
+        when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
+            producerCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(1)).send(any(), any());
+
+        recordOffset = 2;
+        metadataOffset = 102;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        // Do not release outstanding sync semaphore
+        doReturn(null).when(producer).send(any(), producerCallback.capture());
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Do not send sync event
+        recordOffset = 4;
+        metadataOffset = 104;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Should send sync event
+        recordOffset = 5;
+        metadataOffset = 150;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        producerCallback.getValue().onCompletion(null, null);
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(3)).send(any(), any());
+    }
+
     private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
         assertEquals(expectedHeaders.size(), taskHeaders.size());
         for (int i = 0; i < expectedHeaders.size(); i++) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index dfafdcbd8c6..3d448e9c150 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -299,8 +299,10 @@ public class MirrorConnectorsIntegrationBaseTest {
         Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS));
 
-        assertTrue(backupOffsets.containsKey(
-            new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
+                   "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        }
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {


[kafka] 02/04: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 4597e954d91b05b64ff694cc4c2022713c266eba
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Feb 6 04:53:58 2023 -0500

    KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 96 ++++++++++++++--------
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 31 +++++--
 2 files changed, 88 insertions(+), 39 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 4e714a6bf46..b9dd470d013 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
@@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask {
     private ReplicationPolicy replicationPolicy;
     private MirrorMetrics metrics;
     private boolean stopping = false;
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
     private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
 
@@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask {
     @Override
     public void start(Map<String, String> props) {
         MirrorTaskConfig config = new MirrorTaskConfig(props);
+        pendingOffsetSyncs.clear();
         outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
         consumerAccess = new Semaphore(1);  // let one thread at a time access the consumer
         sourceClusterAlias = config.sourceClusterAlias();
@@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask {
 
     @Override
     public void commit() {
-        // nop
+        // Publish any offset syncs that we've queued up, but have not yet been able to publish
+        // (likely because we previously reached our limit for number of outstanding syncs)
+        firePendingOffsetSyncs();
     }
 
     @Override
@@ -176,59 +182,81 @@ public class MirrorSourceTask extends SourceTask {
  
     @Override
     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-        try {
-            if (stopping) {
-                return;
-            }
-            if (!metadata.hasOffset()) {
-                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
-                return;
-            }
-            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
-            long latency = System.currentTimeMillis() - record.timestamp();
-            metrics.countRecord(topicPartition);
-            metrics.replicationLatency(topicPartition, latency);
-            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
-            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
-            long downstreamOffset = metadata.offset();
-            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
-        } catch (Throwable e) {
-            log.warn("Failure committing record.", e);
+        if (stopping) {
+            return;
+        }
+        if (metadata == null) {
+            log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", record.topic());
+            return;
         }
+        if (!metadata.hasOffset()) {
+            log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
+            return;
+        }
+        TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
+        long latency = System.currentTimeMillis() - record.timestamp();
+        metrics.countRecord(topicPartition);
+        metrics.replicationLatency(topicPartition, latency);
+        TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
+        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
+        long downstreamOffset = metadata.offset();
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.trace("No more pending offset syncs");
+                    return;
+                }
+                pendingOffsetSync = syncIterator.next();
+                if (!outstandingOffsetSyncs.tryAcquire()) {
+                    // Too many outstanding syncs
+                    log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
+                    return;
+                }
+                syncIterator.remove();
+            }
+            // Publish offset sync outside of synchronized block; we may have to
+            // wait for producer metadata to update before Producer::send returns
+            sendOffsetSync(pendingOffsetSync);
+            log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
         }
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+    }
+
+    // sends OffsetSync record to internal offsets topic
+    private void sendOffsetSync(OffsetSync offsetSync) {
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
                 offsetSync.recordKey(), offsetSync.recordValue());
         offsetProducer.send(record, (x, e) -> {
             if (e != null) {
                 log.error("Failure sending offset sync.", e);
             } else {
-                log.trace("Sync'd offsets for {}: {}=={}", topicPartition,
-                    upstreamOffset, downstreamOffset);
+                log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
+                    offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
             }
             outstandingOffsetSyncs.release();
         });
-        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index bbd9ec3aff5..300a40087c6 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -283,7 +283,11 @@ public class MirrorSourceTaskTest {
         });
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(1)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
         verify(producer, times(1)).send(any(), any());
 
         recordOffset = 2;
@@ -297,7 +301,11 @@ public class MirrorSourceTaskTest {
         doReturn(null).when(producer).send(any(), producerCallback.capture());
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(2)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
         verify(producer, times(2)).send(any(), any());
 
         // Do not send sync event
@@ -309,22 +317,35 @@ public class MirrorSourceTaskTest {
                 recordValue.length, recordKey, recordValue, headers, Optional.empty()));
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        mirrorSourceTask.commit();
 
+        // We should not have dispatched any more syncs to the producer; there were too many already in flight
         verify(producer, times(2)).send(any(), any());
 
+        // Now the in-flight sync has been ack'd
+        producerCallback.getValue().onCompletion(null, null);
+        mirrorSourceTask.commit();
+        // We should dispatch the offset sync that was queued but previously not sent to the producer now
+        verify(producer, times(3)).send(any(), any());
+
+        // Ack the latest sync immediately
+        producerCallback.getValue().onCompletion(null, null);
+
         // Should send sync event
-        recordOffset = 5;
-        metadataOffset = 150;
+        recordOffset = 6;
+        metadataOffset = 106;
         recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
         sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
                 recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
                 recordValue.length, recordKey, recordValue, headers, Optional.empty()));
 
-        producerCallback.getValue().onCompletion(null, null);
-
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(4)).send(any(), any());
 
-        verify(producer, times(3)).send(any(), any());
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
+        verify(producer, times(4)).send(any(), any());
     }
 
     private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {


[kafka] 04/04: KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 69a2817a0592f02f219909856316e26a4efd21f0
Author: Greg Harris <gr...@aiven.io>
AuthorDate: Fri Feb 17 14:25:17 2023 -0800

    KAFKA-12468, KAFKA-13659, KAFKA-12566: Fix MM2 causing negative downstream lag, syncing stale offsets, and flaky integration tests (#13178)
    
    KAFKA-12468: Fix negative lag on down consumer groups synced by MirrorMaker 2
    
    KAFKA-13659: Stop syncing consumer groups with stale offsets in MirrorMaker 2
    
    KAFKA-12566: Fix flaky MirrorMaker 2 integration tests
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorCheckpointTask.java |  17 +-
 .../kafka/connect/mirror/OffsetSyncStore.java      | 116 ++++++---
 .../connect/mirror/MirrorCheckpointTaskTest.java   |  32 ++-
 .../kafka/connect/mirror/OffsetSyncStoreTest.java  |  86 +++++--
 .../IdentityReplicationIntegrationTest.java        |  45 ++--
 .../MirrorConnectorsIntegrationBaseTest.java       | 259 ++++++++++++++++-----
 ...irrorConnectorsIntegrationTransactionsTest.java |  66 ++++++
 .../apache/kafka/connect/util/KafkaBasedLog.java   |  24 +-
 .../org/apache/kafka/connect/util/TopicAdmin.java  |   3 +-
 9 files changed, 497 insertions(+), 151 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 29483c126b3..02dcdf0f43e 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -101,10 +101,13 @@ public class MirrorCheckpointTask extends SourceTask {
         idleConsumerGroupsOffset = new HashMap<>();
         checkpointsPerConsumerGroup = new HashMap<>();
         scheduler = new Scheduler(MirrorCheckpointTask.class, config.adminTimeout());
-        scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
-                                    "refreshing idle consumers group offsets at target cluster");
-        scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
-                                          "sync idle consumer group offset from source to target");
+        scheduler.execute(() -> {
+            offsetSyncStore.start();
+            scheduler.scheduleRepeating(this::refreshIdleConsumerGroupOffset, config.syncGroupOffsetsInterval(),
+                    "refreshing idle consumers group offsets at target cluster");
+            scheduler.scheduleRepeatingDelayed(this::syncGroupOffset, config.syncGroupOffsetsInterval(),
+                    "sync idle consumer group offset from source to target");
+        }, "starting offset sync store");
     }
 
     @Override
@@ -135,7 +138,11 @@ public class MirrorCheckpointTask extends SourceTask {
         try {
             long deadline = System.currentTimeMillis() + interval.toMillis();
             while (!stopping && System.currentTimeMillis() < deadline) {
-                offsetSyncStore.update(pollTimeout);
+                Thread.sleep(pollTimeout.toMillis());
+            }
+            if (stopping) {
+                // we are stopping, return early.
+                return null;
             }
             List<SourceRecord> records = new ArrayList<>();
             for (String group : consumerGroups) {
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
index f9b6617c13d..52bad401e8a 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/OffsetSyncStore.java
@@ -16,65 +16,121 @@
  */
 package org.apache.kafka.connect.mirror;
 
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TopicAdmin;
 
-import java.util.Map;
-import java.util.HashMap;
 import java.util.Collections;
-import java.time.Duration;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
+import java.util.concurrent.ConcurrentHashMap;
 
 /** Used internally by MirrorMaker. Stores offset syncs and performs offset translation. */
 class OffsetSyncStore implements AutoCloseable {
-    private final KafkaConsumer<byte[], byte[]> consumer;
-    private final Map<TopicPartition, OffsetSync> offsetSyncs = new HashMap<>();
-    private final TopicPartition offsetSyncTopicPartition;
+    private final KafkaBasedLog<byte[], byte[]> backingStore;
+    private final Map<TopicPartition, OffsetSync> offsetSyncs = new ConcurrentHashMap<>();
+    private final TopicAdmin admin;
+    protected volatile boolean readToEnd = false;
 
     OffsetSyncStore(MirrorConnectorConfig config) {
-        consumer = new KafkaConsumer<>(config.offsetSyncsTopicConsumerConfig(),
-            new ByteArrayDeserializer(), new ByteArrayDeserializer());
-        offsetSyncTopicPartition = new TopicPartition(config.offsetSyncsTopic(), 0);
-        consumer.assign(Collections.singleton(offsetSyncTopicPartition));
+        Consumer<byte[], byte[]> consumer = null;
+        TopicAdmin admin = null;
+        KafkaBasedLog<byte[], byte[]> store;
+        try {
+            consumer = MirrorUtils.newConsumer(config.offsetSyncsTopicConsumerConfig());
+            admin = new TopicAdmin(config.offsetSyncsTopicAdminConfig());
+            store = createBackingStore(config, consumer, admin);
+        } catch (Throwable t) {
+            Utils.closeQuietly(consumer, "consumer for offset syncs");
+            Utils.closeQuietly(admin, "admin client for offset syncs");
+            throw t;
+        }
+        this.admin = admin;
+        this.backingStore = store;
     }
 
-    // for testing
-    OffsetSyncStore(KafkaConsumer<byte[], byte[]> consumer, TopicPartition offsetSyncTopicPartition) {
-        this.consumer = consumer;
-        this.offsetSyncTopicPartition = offsetSyncTopicPartition;
+    private KafkaBasedLog<byte[], byte[]> createBackingStore(MirrorConnectorConfig config, Consumer<byte[], byte[]> consumer, TopicAdmin admin) {
+        return new KafkaBasedLog<byte[], byte[]>(
+                config.offsetSyncsTopic(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                () -> admin,
+                (error, record) -> this.handleRecord(record),
+                Time.SYSTEM,
+                ignored -> {
+                }
+        ) {
+            @Override
+            protected Producer<byte[], byte[]> createProducer() {
+                return null;
+            }
+
+            @Override
+            protected Consumer<byte[], byte[]> createConsumer() {
+                return consumer;
+            }
+
+            @Override
+            protected boolean readPartition(TopicPartition topicPartition) {
+                return topicPartition.partition() == 0;
+            }
+        };
+    }
+
+    OffsetSyncStore() {
+        this.admin = null;
+        this.backingStore = null;
+    }
+
+    /**
+     * Start the OffsetSyncStore, blocking until all previous Offset Syncs have been read from backing storage.
+     */
+    public void start() {
+        backingStore.start();
+        readToEnd = true;
     }
 
     OptionalLong translateDownstream(TopicPartition sourceTopicPartition, long upstreamOffset) {
+        if (!readToEnd) {
+            // If we have not read to the end of the syncs topic at least once, decline to translate any offsets.
+            // This prevents emitting stale offsets while initially reading the offset syncs topic.
+            return OptionalLong.empty();
+        }
         Optional<OffsetSync> offsetSync = latestOffsetSync(sourceTopicPartition);
         if (offsetSync.isPresent()) {
             if (offsetSync.get().upstreamOffset() > upstreamOffset) {
                 // Offset is too far in the past to translate accurately
                 return OptionalLong.of(-1L);
             }
-            long upstreamStep = upstreamOffset - offsetSync.get().upstreamOffset();
+            // If the consumer group is ahead of the offset sync, we can translate the upstream offset only 1
+            // downstream offset past the offset sync itself. This is because we know that future records must appear
+            // ahead of the offset sync, but we cannot estimate how many offsets from the upstream topic
+            // will be written vs dropped. If we overestimate, then we may skip the correct offset and have data loss.
+            // This also handles consumer groups at the end of a topic whose offsets point past the last valid record.
+            // This may cause re-reading of records depending on the age of the offset sync.
+            // s=offset sync pair, ?=record may or may not be replicated, g=consumer group offset, r=re-read record
+            // source |-s?????r???g-|
+            //          |  ______/
+            //          | /
+            //          vv
+            // target |-sg----r-----|
+            long upstreamStep = upstreamOffset == offsetSync.get().upstreamOffset() ? 0 : 1;
             return OptionalLong.of(offsetSync.get().downstreamOffset() + upstreamStep);
         } else {
             return OptionalLong.empty();
         }
     }
 
-    // poll and handle records
-    synchronized void update(Duration pollTimeout) {
-        try {
-            consumer.poll(pollTimeout).forEach(this::handleRecord);
-        } catch (WakeupException e) {
-            // swallow
-        }
-    }
-
-    public synchronized void close() {
-        consumer.wakeup();
-        Utils.closeQuietly(consumer, "offset sync store consumer");
+    @Override
+    public void close() {
+        Utils.closeQuietly(backingStore != null ? backingStore::stop : null, "backing store for offset syncs");
+        Utils.closeQuietly(admin, "admin client for offset syncs");
     }
 
     protected void handleRecord(ConsumerRecord<byte[], byte[]> record) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 20735cd2334..500cb6c131a 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -52,11 +52,16 @@ public class MirrorCheckpointTaskTest {
 
     @Test
     public void testCheckpoint() {
+        long t1UpstreamOffset = 3L;
+        long t1DownstreamOffset = 4L;
+        long t2UpstreamOffset = 7L;
+        long t2DownstreamOffset = 8L;
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
-        offsetSyncStore.sync(new TopicPartition("topic1", 2), 3L, 4L);
-        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), 7L, 8L);
+        offsetSyncStore.sync(new TopicPartition("topic1", 2), t1UpstreamOffset, t1DownstreamOffset);
+        offsetSyncStore.sync(new TopicPartition("target2.topic5", 6), t2UpstreamOffset, t2DownstreamOffset);
         Optional<Checkpoint> optionalCheckpoint1 = mirrorCheckpointTask.checkpoint("group9", new TopicPartition("topic1", 2),
             new OffsetAndMetadata(10, null));
         assertTrue(optionalCheckpoint1.isPresent());
@@ -70,7 +75,7 @@ public class MirrorCheckpointTaskTest {
                 "checkpoint group9 sourcePartition failed");
         assertEquals(10, checkpoint1.upstreamOffset(),
                 "checkpoint group9 upstreamOffset failed");
-        assertEquals(11, checkpoint1.downstreamOffset(),
+        assertEquals(t1DownstreamOffset + 1, checkpoint1.downstreamOffset(),
                 "checkpoint group9 downstreamOffset failed");
         assertEquals(123L, sourceRecord1.timestamp().longValue(),
                 "checkpoint group9 timestamp failed");
@@ -87,10 +92,27 @@ public class MirrorCheckpointTaskTest {
                 "checkpoint group11 sourcePartition failed");
         assertEquals(12, checkpoint2.upstreamOffset(),
                 "checkpoint group11 upstreamOffset failed");
-        assertEquals(13, checkpoint2.downstreamOffset(),
+        assertEquals(t2DownstreamOffset + 1, checkpoint2.downstreamOffset(),
                 "checkpoint group11 downstreamOffset failed");
         assertEquals(234L, sourceRecord2.timestamp().longValue(),
                     "checkpoint group11 timestamp failed");
+        Optional<Checkpoint> optionalCheckpoint3 = mirrorCheckpointTask.checkpoint("group13", new TopicPartition("target2.topic5", 6),
+                new OffsetAndMetadata(7, null));
+        assertTrue(optionalCheckpoint3.isPresent());
+        Checkpoint checkpoint3 = optionalCheckpoint3.get();
+        SourceRecord sourceRecord3 = mirrorCheckpointTask.checkpointRecord(checkpoint3, 234L);
+        assertEquals(new TopicPartition("topic5", 6), checkpoint3.topicPartition(),
+                "checkpoint group13 topic5 failed");
+        assertEquals("group13", checkpoint3.consumerGroupId(),
+                "checkpoint group13 consumerGroupId failed");
+        assertEquals("group13", Checkpoint.unwrapGroup(sourceRecord3.sourcePartition()),
+                "checkpoint group13 sourcePartition failed");
+        assertEquals(t2UpstreamOffset, checkpoint3.upstreamOffset(),
+                "checkpoint group13 upstreamOffset failed");
+        assertEquals(t2DownstreamOffset, checkpoint3.downstreamOffset(),
+                "checkpoint group13 downstreamOffset failed");
+        assertEquals(234L, sourceRecord3.timestamp().longValue(),
+                "checkpoint group13 timestamp failed");
     }
 
     @Test
@@ -150,6 +172,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testNoCheckpointForTopicWithoutOffsetSyncs() {
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
                 new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
         offsetSyncStore.sync(new TopicPartition("topic1", 0), 3L, 4L);
@@ -165,6 +188,7 @@ public class MirrorCheckpointTaskTest {
     @Test
     public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
         OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        offsetSyncStore.start();
         MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
             new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
         offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
index 9224a088081..163e5b72250 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/OffsetSyncStoreTest.java
@@ -21,6 +21,8 @@ import org.apache.kafka.common.TopicPartition;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.OptionalLong;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class OffsetSyncStoreTest {
@@ -30,7 +32,13 @@ public class OffsetSyncStoreTest {
     static class FakeOffsetSyncStore extends OffsetSyncStore {
 
         FakeOffsetSyncStore() {
-            super(null, null);
+            super();
+        }
+
+        @Override
+        public void start() {
+            // do not call super to avoid NPE without a KafkaBasedLog.
+            readToEnd = true;
         }
 
         void sync(TopicPartition topicPartition, long upstreamOffset, long downstreamOffset) {
@@ -44,29 +52,57 @@ public class OffsetSyncStoreTest {
 
     @Test
     public void testOffsetTranslation() {
-        FakeOffsetSyncStore store = new FakeOffsetSyncStore();
-
-        store.sync(tp, 100, 200);
-        assertEquals(250L, store.translateDownstream(tp, 150).getAsLong(),
-                "Failure in translating downstream offset 250");
-
-        // Translate exact offsets
-        store.sync(tp, 150, 251);
-        assertEquals(251L, store.translateDownstream(tp, 150).getAsLong(),
-                "Failure in translating exact downstream offset 251");
-
-        // Use old offset (5) prior to any sync -> can't translate
-        assertEquals(-1, store.translateDownstream(tp, 5).getAsLong(),
-                "Expected old offset to not translate");
-
-        // Downstream offsets reset
-        store.sync(tp, 200, 10);
-        assertEquals(10L, store.translateDownstream(tp, 200).getAsLong(),
-                "Failure in resetting translation of downstream offset");
-
-        // Upstream offsets reset
-        store.sync(tp, 20, 20);
-        assertEquals(20L, store.translateDownstream(tp, 20).getAsLong(),
-                "Failure in resetting translation of upstream offset");
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+
+            // Emit synced downstream offset without dead-reckoning
+            store.sync(tp, 100, 200);
+            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 150));
+
+            // Translate exact offsets
+            store.sync(tp, 150, 251);
+            assertEquals(OptionalLong.of(251), store.translateDownstream(tp, 150));
+
+            // Use old offset (5) prior to any sync -> can't translate
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 5));
+
+            // Downstream offsets reset
+            store.sync(tp, 200, 10);
+            assertEquals(OptionalLong.of(10), store.translateDownstream(tp, 200));
+
+            // Upstream offsets reset
+            store.sync(tp, 20, 20);
+            assertEquals(OptionalLong.of(20), store.translateDownstream(tp, 20));
+        }
+    }
+
+    @Test
+    public void testNoTranslationIfStoreNotStarted() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            // no offsets exist and store is not started
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+            // read a sync during startup
+            store.sync(tp, 100, 200);
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 200));
+
+            // After the store is started all offsets are visible
+            store.start();
+            assertEquals(OptionalLong.of(-1), store.translateDownstream(tp, 0));
+            assertEquals(OptionalLong.of(200), store.translateDownstream(tp, 100));
+            assertEquals(OptionalLong.of(201), store.translateDownstream(tp, 200));
+        }
+    }
+
+    @Test
+    public void testNoTranslationIfNoOffsetSync() {
+        try (FakeOffsetSyncStore store = new FakeOffsetSyncStore()) {
+            store.start();
+            assertEquals(OptionalLong.empty(), store.translateDownstream(tp, 0));
+        }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
index 56ae3f8ebf9..e8f01e2bc12 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/IdentityReplicationIntegrationTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.connect.mirror.MirrorClient;
 import org.apache.kafka.connect.mirror.MirrorHeartbeatConnector;
 import org.apache.kafka.connect.mirror.MirrorMakerConfig;
 
-import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -107,11 +106,8 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
                 "Checkpoints were not emitted downstream to backup cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
-                Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
-        assertTrue(backupOffsets.containsKey(
-                new TopicPartition("test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "test-topic-1");
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -221,18 +217,19 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
         waitForTopicCreated(primary, "backup.test-topic-1");
         waitForTopicCreated(backup, "test-topic-1");
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-                consumerProps, "test-topic-1");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+                consumerProps, "test-topic-1")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("test-topic-1"),
+            waitForConsumerGroupFullSync(backup, Collections.singletonList("test-topic-1"),
                 consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
-        // the size of consumer record should be zero, because the offsets of the same consumer group
-        // have been automatically synchronized from primary to backup by the background job, so no
-        // more records to consume from the replicated topic by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
+            // the size of consumer record should be zero, because the offsets of the same consumer group
+            // have been automatically synchronized from primary to backup by the background job, so no
+            // more records to consume from the replicated topic by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -244,22 +241,24 @@ public class IdentityReplicationIntegrationTest extends MirrorConnectorsIntegrat
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {
+                "group.id", consumerGroupName), "test-topic-2")) {
             // we need to wait for consuming all the records for MM2 replicating the expected offsets
             waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", consumerGroupName), "test-topic-1", "test-topic-2");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "group.id", consumerGroupName), "test-topic-1", "test-topic-2")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("test-topic-1", "test-topic-2"),
-                consumerGroupName, NUM_RECORDS_PRODUCED);
+            waitForConsumerGroupFullSync(backup, Arrays.asList("test-topic-1", "test-topic-2"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
+
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
-        records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
-        backupConsumer.close();
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
     }
 
     /*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 3d448e9c150..4c8340236e4 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -20,6 +20,8 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeConfigsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -37,7 +39,6 @@ import org.apache.kafka.connect.mirror.MirrorSourceConnector;
 import org.apache.kafka.connect.mirror.SourceAndTarget;
 import org.apache.kafka.connect.mirror.Checkpoint;
 import org.apache.kafka.connect.mirror.MirrorCheckpointConnector;
-import org.apache.kafka.connect.mirror.ReplicationPolicy;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
 import org.apache.kafka.connect.util.clusters.UngracefulShutdownException;
@@ -54,6 +55,8 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import org.junit.jupiter.api.Tag;
@@ -93,6 +96,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     private static final int OFFSET_SYNC_DURATION_MS = 30_000;
     private static final int TOPIC_SYNC_DURATION_MS = 60_000;
     private static final int REQUEST_TIMEOUT_DURATION_MS = 60_000;
+    private static final int CHECKPOINT_INTERVAL_DURATION_MS = 1_000;
     private static final int NUM_WORKERS = 3;
     protected static final Duration CONSUMER_POLL_TIMEOUT_MS = Duration.ofMillis(500L);
     protected static final String PRIMARY_CLUSTER_ALIAS = "primary";
@@ -115,7 +119,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected Properties backupBrokerProps = new Properties();
     protected Map<String, String> primaryWorkerProps = new HashMap<>();
     protected Map<String, String> backupWorkerProps = new HashMap<>();
-    
+
     @BeforeEach
     public void startClusters() throws Exception {
         startClusters(new HashMap<String, String>() {{
@@ -296,13 +300,8 @@ public class MirrorConnectorsIntegrationBaseTest {
         assertTrue(backup.kafka().consume(1, CHECKPOINT_DURATION_MS, "primary.checkpoints.internal").count() > 0,
             "Checkpoints were not emitted downstream to backup cluster.");
 
-        Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS));
-
-        for (int i = 0; i < NUM_PARTITIONS; i++) {
-            assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
-                   "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
-        }
+        Map<TopicPartition, OffsetAndMetadata> backupOffsets = waitForCheckpointOnAllPartitions(
+                backupClient, consumerGroupName, PRIMARY_CLUSTER_ALIAS, "primary.test-topic-1");
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {
@@ -318,11 +317,10 @@ public class MirrorConnectorsIntegrationBaseTest {
                 "Checkpoints were not emitted upstream to primary cluster.");
         }
 
-        waitForCondition(() -> primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-            Duration.ofMillis(CHECKPOINT_DURATION_MS)).containsKey(new TopicPartition("backup.test-topic-1", 0)), CHECKPOINT_DURATION_MS, "Offsets not translated downstream to primary cluster.");
+        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = waitForCheckpointOnAllPartitions(
+                primaryClient, consumerGroupName, BACKUP_CLUSTER_ALIAS, "backup.test-topic-1");
 
-        Map<TopicPartition, OffsetAndMetadata> primaryOffsets = primaryClient.remoteConsumerOffsets(consumerGroupName, BACKUP_CLUSTER_ALIAS,
-                Duration.ofMillis(CHECKPOINT_DURATION_MS));
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
  
         primaryClient.close();
         backupClient.close();
@@ -409,7 +407,7 @@ public class MirrorConnectorsIntegrationBaseTest {
             assertEquals(0, offset.offset(), "Offset of last partition is not zero");
         }
     }
-    
+
     @Test
     public void testOneWayReplicationWithAutoOffsetSync() throws InterruptedException {
         produceMessages(primary, "test-topic-1");
@@ -440,18 +438,19 @@ public class MirrorConnectorsIntegrationBaseTest {
         waitForTopicCreated(primary, "backup.test-topic-1");
         waitForTopicCreated(backup, "primary.test-topic-1");
         // create a consumer at backup cluster with same consumer group Id to consume 1 topic
-        Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
-            consumerProps, "primary.test-topic-1");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(
+            consumerProps, "primary.test-topic-1")) {
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Collections.singletonList("primary.test-topic-1"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED);
+            waitForConsumerGroupFullSync(backup, Collections.singletonList("primary.test-topic-1"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
 
-        // the size of consumer record should be zero, because the offsets of the same consumer group
-        // have been automatically synchronized from primary to backup by the background job, so no
-        // more records to consume from the replicated topic by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
+            // the size of consumer record should be zero, because the offsets of the same consumer group
+            // have been automatically synchronized from primary to backup by the background job, so no
+            // more records to consume from the replicated topic by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
         // now create a new topic in primary cluster
         primary.kafka().createTopic("test-topic-2", NUM_PARTITIONS);
@@ -463,22 +462,24 @@ public class MirrorConnectorsIntegrationBaseTest {
 
         // create a consumer at primary cluster to consume the new topic
         try (Consumer<byte[], byte[]> consumer1 = primary.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-                "group.id", "consumer-group-1"), "test-topic-2")) {
+                "group.id", consumerGroupName), "test-topic-2")) {
             // we need to wait for consuming all the records for MM2 replicating the expected offsets
             waitForConsumingAllRecords(consumer1, NUM_RECORDS_PRODUCED);
         }
 
         // create a consumer at backup cluster with same consumer group Id to consume old and new topic
-        backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
-            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2");
+        try (Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+            "group.id", consumerGroupName), "primary.test-topic-1", "primary.test-topic-2")) {
+
+            waitForConsumerGroupFullSync(backup, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"),
+                    consumerGroupName, NUM_RECORDS_PRODUCED);
 
-        waitForConsumerGroupOffsetSync(backup, backupConsumer, Arrays.asList("primary.test-topic-1", "primary.test-topic-2"), 
-            consumerGroupName, NUM_RECORDS_PRODUCED);
+            ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
+            assertEquals(0, records.count(), "consumer record size is not zero");
+        }
 
-        records = backupConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        // similar reasoning as above, no more records to consume by the same consumer group at backup cluster
-        assertEquals(0, records.count(), "consumer record size is not zero");
-        backupConsumer.close();
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
     }
 
     @Test
@@ -495,10 +496,14 @@ public class MirrorConnectorsIntegrationBaseTest {
         // Ensure the offset syncs topic is created in the target cluster
         waitForTopicCreated(backup.kafka(), "mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal");
 
+        String consumerGroupName = "consumer-group-syncs-on-target";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+
         produceMessages(primary, "test-topic-1");
 
-        ReplicationPolicy replicationPolicy = new MirrorClient(mm2Config.clientConfig(BACKUP_CLUSTER_ALIAS)).replicationPolicy();
-        String remoteTopic = replicationPolicy.formatRemoteTopic(PRIMARY_CLUSTER_ALIAS, "test-topic-1");
+        warmUpConsumer(consumerProps);
+
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
 
         // Check offsets are pushed to the checkpoint topic
         Consumer<byte[], byte[]> backupConsumer = backup.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
@@ -516,6 +521,8 @@ public class MirrorConnectorsIntegrationBaseTest {
             "Unable to find checkpoints for " + PRIMARY_CLUSTER_ALIAS + ".test-topic-1"
         );
 
+        assertMonotonicCheckpoints(backup, PRIMARY_CLUSTER_ALIAS + ".checkpoints.internal");
+
         // Ensure no offset-syncs topics have been created on the primary cluster
         Set<String> primaryTopics = primary.kafka().createAdminClient().listTopics().names().get();
         assertFalse(primaryTopics.contains("mm2-offset-syncs." + PRIMARY_CLUSTER_ALIAS + ".internal"));
@@ -568,6 +575,17 @@ public class MirrorConnectorsIntegrationBaseTest {
         // Send some records to test-topic-no-checkpoints in the source cluster
         produceMessages(primary, "test-topic-no-checkpoints");
 
+        try (Consumer<byte[], byte[]> consumer = primary.kafka().createConsumer(consumerProps)) {
+            Collection<TopicPartition> tps = Arrays.asList(tp1, tp2);
+            Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps);
+            Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = endOffsets.entrySet().stream()
+                    .collect(Collectors.toMap(
+                            Map.Entry::getKey,
+                            e -> new OffsetAndMetadata(e.getValue())
+                    ));
+            consumer.commitSync(offsetsToCommit);
+        }
+
         waitForCondition(() -> {
             Map<TopicPartition, OffsetAndMetadata> translatedOffsets = backupClient.remoteConsumerOffsets(
                     consumerGroupName, PRIMARY_CLUSTER_ALIAS, Duration.ofSeconds(30L));
@@ -576,6 +594,33 @@ public class MirrorConnectorsIntegrationBaseTest {
         }, OFFSET_SYNC_DURATION_MS, "Checkpoints were not emitted correctly to backup cluster");
     }
 
+    @Test
+    public void testRestartReplication() throws InterruptedException {
+        String consumerGroupName = "consumer-group-restart";
+        Map<String, Object> consumerProps = Collections.singletonMap("group.id", consumerGroupName);
+        String remoteTopic = remoteTopicName("test-topic-1", PRIMARY_CLUSTER_ALIAS);
+        warmUpConsumer(consumerProps);
+        mm2Props.put("sync.group.offsets.enabled", "true");
+        mm2Props.put("sync.group.offsets.interval.seconds", "1");
+        mm2Config = new MirrorMakerConfig(mm2Props);
+        waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);
+        produceMessages(primary, "test-topic-1");
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+        }
+        waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, NUM_RECORDS_PRODUCED);
+        restartMirrorMakerConnectors(backup, CONNECTOR_LIST);
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+        Thread.sleep(5000);
+        produceMessages(primary, "test-topic-1");
+        try (Consumer<byte[], byte[]> primaryConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            waitForConsumingAllRecords(primaryConsumer, NUM_RECORDS_PRODUCED);
+        }
+        waitForConsumerGroupFullSync(backup, Collections.singletonList(remoteTopic), consumerGroupName, 2 * NUM_RECORDS_PRODUCED);
+        assertMonotonicCheckpoints(backup, "primary.checkpoints.internal");
+    }
+
+
     private TopicPartition remoteTopicPartition(TopicPartition tp, String alias) {
         return new TopicPartition(remoteTopicName(tp.topic(), alias), tp.partition());
     }
@@ -638,6 +683,12 @@ public class MirrorConnectorsIntegrationBaseTest {
         }
     }
 
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+        for (Class<? extends Connector> connector : connectorClasses) {
+            connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false);
+        }
+    }
+
     /*
      * wait for the topic created on the cluster
      */
@@ -680,7 +731,7 @@ public class MirrorConnectorsIntegrationBaseTest {
     protected void produceMessages(EmbeddedConnectCluster cluster, String topicName) {
         Map<String, String> recordSent = generateRecords(NUM_RECORDS_PRODUCED);
         for (Map.Entry<String, String> entry : recordSent.entrySet()) {
-            cluster.kafka().produce(topicName, entry.getKey(), entry.getValue());
+            produce(cluster.kafka(), topicName, null, entry.getKey(), entry.getValue());
         }
     }
 
@@ -691,16 +742,70 @@ public class MirrorConnectorsIntegrationBaseTest {
         int cnt = 0;
         for (int r = 0; r < NUM_RECORDS_PER_PARTITION; r++)
             for (int p = 0; p < numPartitions; p++)
-                cluster.kafka().produce(topicName, p, "key", "value-" + cnt++);
+                produce(cluster.kafka(), topicName, p, "key", "value-" + cnt++);
     }
-    
+
+    /**
+     * Produce a test record to a Kafka cluster.
+     * This method allows subclasses to configure and use their own Kafka Producer instead of using the built-in default.
+     * @param cluster   Kafka cluster that should receive the record
+     * @param topic     Topic to send the record to, non-null
+     * @param partition Partition to send the record to, maybe null.
+     * @param key       Kafka key for the record
+     * @param value     Kafka value for the record
+     */
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        cluster.produce(topic, partition, key, value);
+    }
+
+    protected static Map<TopicPartition, OffsetAndMetadata> waitForCheckpointOnAllPartitions(
+            MirrorClient client, String consumerGroupName, String remoteClusterAlias, String topicName
+    ) throws InterruptedException {
+        AtomicReference<Map<TopicPartition, OffsetAndMetadata>> ret = new AtomicReference<>();
+        waitForCondition(
+                () -> {
+                    Map<TopicPartition, OffsetAndMetadata> offsets = client.remoteConsumerOffsets(
+                            consumerGroupName, remoteClusterAlias, Duration.ofMillis(3000));
+                    for (int i = 0; i < NUM_PARTITIONS; i++) {
+                        if (!offsets.containsKey(new TopicPartition(topicName, i))) {
+                            log.info("Checkpoint is missing for {}: {}-{}", consumerGroupName, topicName, i);
+                            return false;
+                        }
+                    }
+                    ret.set(offsets);
+                    return true;
+                },
+                CHECKPOINT_DURATION_MS,
+                String.format(
+                        "Offsets for consumer group %s not translated from %s for topic %s",
+                        consumerGroupName,
+                        remoteClusterAlias,
+                        topicName
+                )
+        );
+        return ret.get();
+    }
+
     /*
      * given consumer group, topics and expected number of records, make sure the consumer group
      * offsets are eventually synced to the expected offset numbers
      */
-    protected static <T> void waitForConsumerGroupOffsetSync(EmbeddedConnectCluster connect,
-            Consumer<T, T> consumer, List<String> topics, String consumerGroupId, int numRecords)
-            throws InterruptedException {
+    protected static <T> void waitForConsumerGroupFullSync(
+            EmbeddedConnectCluster connect, List<String> topics, String consumerGroupId, int numRecords
+    ) throws InterruptedException {
+        int expectedRecords = numRecords * topics.size();
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.put("isolation.level", "read_committed");
+        consumerProps.put("auto.offset.reset", "earliest");
+        Map<TopicPartition, Long> lastOffset = new HashMap<>();
+        try (Consumer<byte[], byte[]> consumer = connect.kafka().createConsumerAndSubscribeTo(consumerProps, topics.toArray(new String[0]))) {
+            final AtomicInteger totalConsumedRecords = new AtomicInteger(0);
+            waitForCondition(() -> {
+                ConsumerRecords<byte[], byte[]> records = consumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+                records.forEach(record -> lastOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset()));
+                return expectedRecords == totalConsumedRecords.addAndGet(records.count());
+            }, RECORD_CONSUME_DURATION_MS, "Consumer cannot consume all records in time");
+        }
         try (Admin adminClient = connect.kafka().createAdminClient()) {
             List<TopicPartition> tps = new ArrayList<>(NUM_PARTITIONS * topics.size());
             for (int partitionIndex = 0; partitionIndex < NUM_PARTITIONS; partitionIndex++) {
@@ -708,23 +813,54 @@ public class MirrorConnectorsIntegrationBaseTest {
                     tps.add(new TopicPartition(topic, partitionIndex));
                 }
             }
-            long expectedTotalOffsets = numRecords * topics.size();
 
             waitForCondition(() -> {
                 Map<TopicPartition, OffsetAndMetadata> consumerGroupOffsets =
-                    adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
-                long consumerGroupOffsetTotal = consumerGroupOffsets.values().stream()
-                    .mapToLong(OffsetAndMetadata::offset).sum();
-
-                Map<TopicPartition, Long> offsets = consumer.endOffsets(tps, CONSUMER_POLL_TIMEOUT_MS);
-                long totalOffsets = offsets.values().stream().mapToLong(l -> l).sum();
-
-                // make sure the consumer group offsets are synced to expected number
-                return totalOffsets == expectedTotalOffsets && consumerGroupOffsetTotal > 0;
+                        adminClient.listConsumerGroupOffsets(consumerGroupId).partitionsToOffsetAndMetadata().get();
+                Map<TopicPartition, OffsetSpec> endOffsetRequest = tps.stream()
+                        .collect(Collectors.toMap(Function.identity(), ignored -> OffsetSpec.latest()));
+                Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> endOffsets =
+                        adminClient.listOffsets(endOffsetRequest).all().get();
+
+                for (TopicPartition tp : tps) {
+                    assertTrue(consumerGroupOffsets.containsKey(tp),
+                            "TopicPartition " + tp + " does not have translated offsets");
+                    assertTrue(consumerGroupOffsets.get(tp).offset() > lastOffset.get(tp),
+                            "TopicPartition " + tp + " does not have fully-translated offsets");
+                    assertTrue(consumerGroupOffsets.get(tp).offset() <= endOffsets.get(tp).offset(),
+                            "TopicPartition " + tp + " has downstream offsets beyond the log end, this would lead to negative lag metrics");
+                }
+                return true;
             }, OFFSET_SYNC_DURATION_MS, "Consumer group offset sync is not complete in time");
         }
     }
 
+    protected static void assertMonotonicCheckpoints(EmbeddedConnectCluster cluster, String checkpointTopic) {
+        TopicPartition checkpointTopicPartition = new TopicPartition(checkpointTopic, 0);
+        try (Consumer<byte[], byte[]> backupConsumer = cluster.kafka().createConsumerAndSubscribeTo(Collections.singletonMap(
+                "auto.offset.reset", "earliest"), checkpointTopic)) {
+            Map<String, Map<TopicPartition, Checkpoint>> checkpointsByGroup = new HashMap<>();
+            long deadline = System.currentTimeMillis() + CHECKPOINT_DURATION_MS;
+            do {
+                ConsumerRecords<byte[], byte[]> records = backupConsumer.poll(Duration.ofSeconds(1L));
+                for (ConsumerRecord<byte[], byte[]> record : records) {
+                    Checkpoint checkpoint = Checkpoint.deserializeRecord(record);
+                    Map<TopicPartition, Checkpoint> lastCheckpoints = checkpointsByGroup.computeIfAbsent(
+                            checkpoint.consumerGroupId(),
+                            ignored -> new HashMap<>());
+                    Checkpoint lastCheckpoint = lastCheckpoints.getOrDefault(checkpoint.topicPartition(), checkpoint);
+                    assertTrue(checkpoint.downstreamOffset() >= lastCheckpoint.downstreamOffset(),
+                            "Checkpoint was non-monotonic for "
+                                    + checkpoint.consumerGroupId()
+                                    + ": "
+                                    + checkpoint.topicPartition());
+                    lastCheckpoints.put(checkpoint.topicPartition(), checkpoint);
+                }
+            } while (backupConsumer.currentLag(checkpointTopicPartition).orElse(1) > 0 && System.currentTimeMillis() < deadline);
+            assertEquals(0, backupConsumer.currentLag(checkpointTopicPartition).orElse(1), "Unable to read all checkpoints within " + CHECKPOINT_DURATION_MS + "ms");
+        }
+    }
+
     /*
      * make sure the consumer to consume expected number of records
      */
@@ -747,7 +883,7 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("max.tasks", "10");
         mm2Props.put("groups", "consumer-group-.*");
         mm2Props.put("sync.topic.acls.enabled", "false");
-        mm2Props.put("emit.checkpoints.interval.seconds", "1");
+        mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
         mm2Props.put("emit.heartbeats.interval.seconds", "1");
         mm2Props.put("refresh.topics.interval.seconds", "1");
         mm2Props.put("refresh.groups.interval.seconds", "1");
@@ -758,7 +894,10 @@ public class MirrorConnectorsIntegrationBaseTest {
         mm2Props.put("offset.storage.replication.factor", "1");
         mm2Props.put("status.storage.replication.factor", "1");
         mm2Props.put("replication.factor", "1");
-        
+        // Sync offsets as soon as possible to ensure the final record in a finite test has its offset translated.
+        mm2Props.put("offset.lag.max", "0");
+        mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
+        mm2Props.put(BACKUP_CLUSTER_ALIAS + ".offset.flush.interval.ms", "5000");
         return mm2Props;
     }
     
@@ -785,14 +924,14 @@ public class MirrorConnectorsIntegrationBaseTest {
      * Generate some consumer activity on both clusters to ensure the checkpoint connector always starts promptly
      */
     protected void warmUpConsumer(Map<String, Object> consumerProps) {
-        Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
-        dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        dummyConsumer.commitSync();
-        dummyConsumer.close();
-        dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1");
-        dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
-        dummyConsumer.commitSync();
-        dummyConsumer.close();
+        try (Consumer<byte[], byte[]> dummyConsumer = primary.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            dummyConsumer.commitSync();
+        }
+        try (Consumer<byte[], byte[]> dummyConsumer = backup.kafka().createConsumerAndSubscribeTo(consumerProps, "test-topic-1")) {
+            dummyConsumer.poll(CONSUMER_POLL_TIMEOUT_MS);
+            dummyConsumer.commitSync();
+        }
     }
 
     /*
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
new file mode 100644
index 00000000000..6ac09ef32bb
--- /dev/null
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationTransactionsTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.mirror.integration;
+
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Integration test for MirrorMaker2 in which source records are emitted with a transactional producer,
+ * which interleaves transaction commit messages into the source topic which are not propagated downstream.
+ */
+public class MirrorConnectorsIntegrationTransactionsTest extends MirrorConnectorsIntegrationBaseTest {
+
+    private Map<String, Object> producerProps = new HashMap<>();
+
+    @BeforeEach
+    @Override
+    public void startClusters() throws Exception {
+        primaryBrokerProps.put("transaction.state.log.replication.factor", "1");
+        backupBrokerProps.put("transaction.state.log.replication.factor", "1");
+        primaryBrokerProps.put("transaction.state.log.min.isr", "1");
+        backupBrokerProps.put("transaction.state.log.min.isr", "1");
+        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
+        producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "embedded-kafka-0");
+        super.startClusters();
+    }
+
+    /**
+     * Produce records with a short-lived transactional producer to interleave transaction markers in the topic.
+     */
+    @Override
+    protected void produce(EmbeddedKafkaCluster cluster, String topic, Integer partition, String key, String value) {
+        ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
+        try (Producer<byte[], byte[]> producer = cluster.createProducer(producerProps)) {
+            producer.initTransactions();
+            producer.beginTransaction();
+            producer.send(msg).get(120000, TimeUnit.MILLISECONDS);
+            producer.commitTransaction();
+        } catch (Exception e) {
+            throw new KafkaException("Could not produce message: " + msg, e);
+        }
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index 899b42dd877..431ae871ce9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -257,8 +257,16 @@ public class KafkaBasedLog<K, V> {
                     " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
                     " this is your first use of the topic it may have taken too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        for (PartitionInfo partition : partitionInfos) {
+            TopicPartition topicPartition = new TopicPartition(partition.topic(), partition.partition());
+            if (readPartition(topicPartition)) {
+                partitions.add(topicPartition);
+            }
+        }
+        if (partitions.isEmpty()) {
+            throw new ConnectException("Some partitions for " + topic + " exist, but no partitions matched the " +
+                    "required filter.");
+        }
         partitionCount = partitions.size();
         consumer.assign(partitions);
 
@@ -392,6 +400,18 @@ public class KafkaBasedLog<K, V> {
         return new KafkaConsumer<>(consumerConfigs);
     }
 
+    /**
+     * Signals whether a topic partition should be read by this log. Invoked on {@link #start() startup} once
+     * for every partition found in the log's backing topic.
+     * <p>This method can be overridden by subclasses when only a subset of the assigned partitions
+     * should be read into memory. By default, all partitions are read.
+     * @param topicPartition A topic partition which could be read by this log.
+     * @return true if the partition should be read by this log, false if its contents should be ignored.
+     */
+    protected boolean readPartition(TopicPartition topicPartition) {
+        return true;
+    }
+
     private void poll(long timeoutMs) {
         try {
             ConsumerRecords<K, V> records = consumer.poll(Duration.ofMillis(timeoutMs));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
index f9defc77ca2..691852430a0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/TopicAdmin.java
@@ -282,8 +282,7 @@ public class TopicAdmin implements AutoCloseable {
         this(adminConfig.get(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), Admin.create(adminConfig));
     }
 
-    // visible for testing
-    TopicAdmin(Object bootstrapServers, Admin adminClient) {
+    public TopicAdmin(Object bootstrapServers, Admin adminClient) {
         this(bootstrapServers, adminClient, true);
     }
 


[kafka] 03/04: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)

Posted by ce...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 51ee89a598b474108363bdb9677842eb21148a82
Author: csolidum <ch...@gmail.com>
AuthorDate: Wed Jan 4 03:02:52 2023 -0800

    KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
 .../apache/kafka/connect/mirror/MirrorCheckpointTask.java  | 14 ++++++++------
 .../kafka/connect/mirror/MirrorCheckpointTaskTest.java     | 10 ++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 959961812ea..29483c126b3 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -188,14 +188,16 @@ public class MirrorCheckpointTask extends SourceTask {
 
     Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition,
                                     OffsetAndMetadata offsetAndMetadata) {
-        long upstreamOffset = offsetAndMetadata.offset();
-        OptionalLong downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-        if (downstreamOffset.isPresent()) {
-            return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
+        if (offsetAndMetadata != null) {
+            long upstreamOffset = offsetAndMetadata.offset();
+            OptionalLong downstreamOffset =
+                offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+            if (downstreamOffset.isPresent()) {
+                return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
                     upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
-        } else {
-            return Optional.empty();
+            }
         }
+        return Optional.empty();
     }
 
     SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 54fe678e73a..20735cd2334 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest {
         assertFalse(checkpoint1.isPresent());
         assertTrue(checkpoint2.isPresent());
     }
+
+    @Test
+    public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+            new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
+        offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
+        Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
+        assertFalse(checkpoint.isPresent());
+    }
 }