You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:09 UTC

[kafka] 01/04: KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 55e69a0db81713f700635f96cce56351abfe8ba3
Author: emilnkrastev <em...@gmail.com>
AuthorDate: Tue Jan 10 16:46:25 2023 +0200

    KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     |  22 +++-
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 139 ++++++++++++++++++++-
 .../MirrorConnectorsIntegrationBaseTest.java       |   6 +-
 3 files changed, 156 insertions(+), 11 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index da4697ddf25..5635eb7189d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,7 +69,9 @@ public class MirrorSourceTask extends SourceTask {
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias,
-                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
+                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
+                     Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,
+                     String offsetSyncsTopic) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
@@ -77,6 +79,9 @@ public class MirrorSourceTask extends SourceTask {
         this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
         this.offsetProducer = producer;
+        this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+        this.partitionStates = partitionStates;
+        this.offsetSyncsTopic = offsetSyncsTopic;
     }
 
     @Override
@@ -198,16 +203,18 @@ public class MirrorSourceTask extends SourceTask {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
+                partitionState.reset();
+            }
         }
     }
 
     // sends OffsetSync record upstream to internal offsets topic
-    private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
+    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
             long downstreamOffset) {
         if (!outstandingOffsetSyncs.tryAcquire()) {
             // Too many outstanding offset syncs.
-            return;
+            return false;
         }
         OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
@@ -221,6 +228,7 @@ public class MirrorSourceTask extends SourceTask {
             }
             outstandingOffsetSyncs.release();
         });
+        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
@@ -272,6 +280,7 @@ public class MirrorSourceTask extends SourceTask {
         long lastSyncUpstreamOffset = -1L;
         long lastSyncDownstreamOffset = -1L;
         long maxOffsetLag;
+        boolean shouldSyncOffsets;
 
         PartitionState(long maxOffsetLag) {
             this.maxOffsetLag = maxOffsetLag;
@@ -279,7 +288,6 @@ public class MirrorSourceTask extends SourceTask {
 
         // true if we should emit an offset sync
         boolean update(long upstreamOffset, long downstreamOffset) {
-            boolean shouldSyncOffsets = false;
             long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
             long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
             if (lastSyncDownstreamOffset == -1L
@@ -294,5 +302,9 @@ public class MirrorSourceTask extends SourceTask {
             previousDownstreamOffset = downstreamOffset;
             return shouldSyncOffsets;
         }
+
+        void reset() {
+            shouldSyncOffsets = false;
+        }
     }
 }
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 14cf4143c90..9dfcf807ed2 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,26 +19,37 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
 import org.apache.kafka.connect.source.SourceRecord;
 
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.Semaphore;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verifyNoInteractions;
 
@@ -56,7 +67,7 @@ public class MirrorSourceTaskTest {
         @SuppressWarnings("unchecked")
         KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
-                new DefaultReplicationPolicy(), 50, producer);
+                new DefaultReplicationPolicy(), 50, producer, null, null, null);
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
         assertEquals("cluster7.topic1", sourceRecord.topic(),
                 "Failure on cluster7.topic1 consumerRecord serde");
@@ -81,15 +92,33 @@ public class MirrorSourceTaskTest {
         MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
 
         assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
         assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 152), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(4, 153), "no sync");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 154), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 205), "one past target offset");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 206), "upstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(3, 207), "no sync");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "downstream reset");
+        partitionState.reset();
         assertFalse(partitionState.update(5, 4), "no sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(7, 6), "sync");
+        assertTrue(partitionState.update(8, 7), "sync");
+        assertTrue(partitionState.update(10, 57), "sync");
+        partitionState.reset();
+        assertFalse(partitionState.update(11, 58), "sync");
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
     }
 
     @Test
@@ -98,15 +127,32 @@ public class MirrorSourceTaskTest {
 
         // if max offset lag is zero, should always emit offset syncs
         assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
+        assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+        partitionState.reset();
+        assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
         assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
+        partitionState.reset();
         assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+        assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect");
+        assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect");
+        partitionState.reset();
+        assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect");
     }
 
     @Test
@@ -140,7 +186,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
-                replicationPolicy, 50, producer);
+                replicationPolicy, 50, producer, null, null, null);
         List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
 
         assertEquals(2, sourceRecords.size());
@@ -186,7 +232,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
-                replicationPolicy, 50, producer);
+                replicationPolicy, 50, producer, null, null, null);
 
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
                 TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
@@ -196,6 +242,91 @@ public class MirrorSourceTaskTest {
         verifyNoInteractions(producer);
     }
 
+    @Test
+    public void testSendSyncEvent() {
+        byte[] recordKey = "key".getBytes();
+        byte[] recordValue = "value".getBytes();
+        int maxOffsetLag = 50;
+        int recordPartition = 0;
+        int recordOffset = 0;
+        int metadataOffset = 100;
+        String topicName = "topic";
+        String sourceClusterName = "sourceCluster";
+
+        RecordHeaders headers = new RecordHeaders();
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+        Semaphore outstandingOffsetSyncs = new Semaphore(1);
+        PartitionState partitionState = new PartitionState(maxOffsetLag);
+        Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+                replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+        partitionStates.put(sourceTopicPartition, partitionState);
+        RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+        ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
+        when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
+            producerCallback.getValue().onCompletion(null, null);
+            return null;
+        });
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(1)).send(any(), any());
+
+        recordOffset = 2;
+        metadataOffset = 102;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        // Do not release outstanding sync semaphore
+        doReturn(null).when(producer).send(any(), producerCallback.capture());
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Do not send sync event
+        recordOffset = 4;
+        metadataOffset = 104;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(2)).send(any(), any());
+
+        // Should send sync event
+        recordOffset = 5;
+        metadataOffset = 150;
+        recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+        sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+                recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+                recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+        producerCallback.getValue().onCompletion(null, null);
+
+        mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+        verify(producer, times(3)).send(any(), any());
+    }
+
     private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
         assertEquals(expectedHeaders.size(), taskHeaders.size());
         for (int i = 0; i < expectedHeaders.size(); i++) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 9b744cb3e99..5ca9b110707 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -305,8 +305,10 @@ public class MirrorConnectorsIntegrationBaseTest {
         Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
             Duration.ofMillis(CHECKPOINT_DURATION_MS));
 
-        assertTrue(backupOffsets.containsKey(
-            new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        for (int i = 0; i < NUM_PARTITIONS; i++) {
+            assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
+                   "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+        }
 
         // Failover consumer group to backup cluster.
         try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {