You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:10 UTC

[kafka] 02/04: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit a4b33bd0a5a757298d8a910f013c8b4ae24835c2
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Feb 6 04:53:58 2023 -0500

    KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 62 ++++++++++++++++------
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 31 +++++++++--
 2 files changed, 71 insertions(+), 22 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 5635eb7189d..09de13aff34 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
@@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask {
     private ReplicationPolicy replicationPolicy;
     private MirrorSourceMetrics metrics;
     private boolean stopping = false;
+    private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
     private Semaphore outstandingOffsetSyncs;
     private Semaphore consumerAccess;
 
@@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask {
     @Override
     public void start(Map<String, String> props) {
         MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+        pendingOffsetSyncs.clear();
         outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
         consumerAccess = new Semaphore(1);  // let one thread at a time access the consumer
         sourceClusterAlias = config.sourceClusterAlias();
@@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask {
 
     @Override
     public void commit() {
-        // nop
+        // Publish any offset syncs that we've queued up, but have not yet been able to publish
+        // (likely because we previously reached our limit for number of outstanding syncs)
+        firePendingOffsetSyncs();
     }
 
     @Override
@@ -194,41 +200,63 @@ public class MirrorSourceTask extends SourceTask {
         TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
         long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
         long downstreamOffset = metadata.offset();
-        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+        // We may be able to immediately publish an offset sync that we've queued up here
+        firePendingOffsetSyncs();
     }
 
-    // updates partition state and sends OffsetSync if necessary
-    private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
+    // updates partition state and queues up OffsetSync if necessary
+    private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+                                       long downstreamOffset) {
         PartitionState partitionState =
             partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
         if (partitionState.update(upstreamOffset, downstreamOffset)) {
-            if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
-                partitionState.reset();
+            OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+            synchronized (this) {
+                pendingOffsetSyncs.put(topicPartition, offsetSync);
             }
+            partitionState.reset();
         }
     }
 
-    // sends OffsetSync record upstream to internal offsets topic
-    private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
-            long downstreamOffset) {
-        if (!outstandingOffsetSyncs.tryAcquire()) {
-            // Too many outstanding offset syncs.
-            return false;
+    private void firePendingOffsetSyncs() {
+        while (true) {
+            OffsetSync pendingOffsetSync;
+            synchronized (this) {
+                Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+                if (!syncIterator.hasNext()) {
+                    // Nothing to sync
+                    log.trace("No more pending offset syncs");
+                    return;
+                }
+                pendingOffsetSync = syncIterator.next();
+                if (!outstandingOffsetSyncs.tryAcquire()) {
+                    // Too many outstanding syncs
+                    log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
+                    return;
+                }
+                syncIterator.remove();
+            }
+            // Publish offset sync outside of synchronized block; we may have to
+            // wait for producer metadata to update before Producer::send returns
+            sendOffsetSync(pendingOffsetSync);
+            log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
         }
-        OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+    }
+
+    // sends OffsetSync record to internal offsets topic
+    private void sendOffsetSync(OffsetSync offsetSync) {
         ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
                 offsetSync.recordKey(), offsetSync.recordValue());
         offsetProducer.send(record, (x, e) -> {
             if (e != null) {
                 log.error("Failure sending offset sync.", e);
             } else {
-                log.trace("Sync'd offsets for {}: {}=={}", topicPartition,
-                    upstreamOffset, downstreamOffset);
+                log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
+                    offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
             }
             outstandingOffsetSyncs.release();
         });
-        return true;
     }
  
     private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 9dfcf807ed2..b309df79fd9 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -283,7 +283,11 @@ public class MirrorSourceTaskTest {
         });
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(1)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
         verify(producer, times(1)).send(any(), any());
 
         recordOffset = 2;
@@ -297,7 +301,11 @@ public class MirrorSourceTaskTest {
         doReturn(null).when(producer).send(any(), producerCallback.capture());
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(2)).send(any(), any());
 
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
         verify(producer, times(2)).send(any(), any());
 
         // Do not send sync event
@@ -309,22 +317,35 @@ public class MirrorSourceTaskTest {
                 recordValue.length, recordKey, recordValue, headers, Optional.empty()));
 
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        mirrorSourceTask.commit();
 
+        // We should not have dispatched any more syncs to the producer; there were too many already in flight
         verify(producer, times(2)).send(any(), any());
 
+        // Now the in-flight sync has been ack'd
+        producerCallback.getValue().onCompletion(null, null);
+        mirrorSourceTask.commit();
+        // We should dispatch the offset sync that was queued but previously not sent to the producer now
+        verify(producer, times(3)).send(any(), any());
+
+        // Ack the latest sync immediately
+        producerCallback.getValue().onCompletion(null, null);
+
         // Should send sync event
-        recordOffset = 5;
-        metadataOffset = 150;
+        recordOffset = 6;
+        metadataOffset = 106;
         recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
         sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
                 recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
                 recordValue.length, recordKey, recordValue, headers, Optional.empty()));
 
-        producerCallback.getValue().onCompletion(null, null);
-
         mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+        // We should have dispatched this sync to the producer
+        verify(producer, times(4)).send(any(), any());
 
-        verify(producer, times(3)).send(any(), any());
+        mirrorSourceTask.commit();
+        // No more syncs should take place; we've been able to publish all of them so far
+        verify(producer, times(4)).send(any(), any());
     }
 
     private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {