You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:10 UTC
[kafka] 02/04: KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit a4b33bd0a5a757298d8a910f013c8b4ae24835c2
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Mon Feb 6 04:53:58 2023 -0500
KAFKA-14610: Publish Mirror Maker 2 offset syncs in task commit() method (#13181)
Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
.../kafka/connect/mirror/MirrorSourceTask.java | 62 ++++++++++++++++------
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 31 +++++++++--
2 files changed, 71 insertions(+), 22 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index 5635eb7189d..09de13aff34 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -36,6 +36,8 @@ import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.HashMap;
import java.util.List;
@@ -62,6 +64,7 @@ public class MirrorSourceTask extends SourceTask {
private ReplicationPolicy replicationPolicy;
private MirrorSourceMetrics metrics;
private boolean stopping = false;
+ private final Map<TopicPartition, OffsetSync> pendingOffsetSyncs = new LinkedHashMap<>();
private Semaphore outstandingOffsetSyncs;
private Semaphore consumerAccess;
@@ -87,6 +90,7 @@ public class MirrorSourceTask extends SourceTask {
@Override
public void start(Map<String, String> props) {
MirrorSourceTaskConfig config = new MirrorSourceTaskConfig(props);
+ pendingOffsetSyncs.clear();
outstandingOffsetSyncs = new Semaphore(MAX_OUTSTANDING_OFFSET_SYNCS);
consumerAccess = new Semaphore(1); // let one thread at a time access the consumer
sourceClusterAlias = config.sourceClusterAlias();
@@ -111,7 +115,9 @@ public class MirrorSourceTask extends SourceTask {
@Override
public void commit() {
- // nop
+ // Publish any offset syncs that we've queued up, but have not yet been able to publish
+ // (likely because we previously reached our limit for number of outstanding syncs)
+ firePendingOffsetSyncs();
}
@Override
@@ -194,41 +200,63 @@ public class MirrorSourceTask extends SourceTask {
TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
long downstreamOffset = metadata.offset();
- maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
+ maybeQueueOffsetSyncs(sourceTopicPartition, upstreamOffset, downstreamOffset);
+ // We may be able to immediately publish an offset sync that we've queued up here
+ firePendingOffsetSyncs();
}
- // updates partition state and sends OffsetSync if necessary
- private void maybeSyncOffsets(TopicPartition topicPartition, long upstreamOffset,
- long downstreamOffset) {
+ // updates partition state and queues up OffsetSync if necessary
+ private void maybeQueueOffsetSyncs(TopicPartition topicPartition, long upstreamOffset,
+ long downstreamOffset) {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
if (partitionState.update(upstreamOffset, downstreamOffset)) {
- if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
- partitionState.reset();
+ OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+ synchronized (this) {
+ pendingOffsetSyncs.put(topicPartition, offsetSync);
}
+ partitionState.reset();
}
}
- // sends OffsetSync record upstream to internal offsets topic
- private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
- long downstreamOffset) {
- if (!outstandingOffsetSyncs.tryAcquire()) {
- // Too many outstanding offset syncs.
- return false;
+ private void firePendingOffsetSyncs() {
+ while (true) {
+ OffsetSync pendingOffsetSync;
+ synchronized (this) {
+ Iterator<OffsetSync> syncIterator = pendingOffsetSyncs.values().iterator();
+ if (!syncIterator.hasNext()) {
+ // Nothing to sync
+ log.trace("No more pending offset syncs");
+ return;
+ }
+ pendingOffsetSync = syncIterator.next();
+ if (!outstandingOffsetSyncs.tryAcquire()) {
+ // Too many outstanding syncs
+ log.trace("Too many in-flight offset syncs; will try to send remaining offset syncs later");
+ return;
+ }
+ syncIterator.remove();
+ }
+ // Publish offset sync outside of synchronized block; we may have to
+ // wait for producer metadata to update before Producer::send returns
+ sendOffsetSync(pendingOffsetSync);
+ log.trace("Dispatched offset sync for {}", pendingOffsetSync.topicPartition());
}
- OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+ }
+
+ // sends OffsetSync record to internal offsets topic
+ private void sendOffsetSync(OffsetSync offsetSync) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
offsetSync.recordKey(), offsetSync.recordValue());
offsetProducer.send(record, (x, e) -> {
if (e != null) {
log.error("Failure sending offset sync.", e);
} else {
- log.trace("Sync'd offsets for {}: {}=={}", topicPartition,
- upstreamOffset, downstreamOffset);
+ log.trace("Sync'd offsets for {}: {}=={}", offsetSync.topicPartition(),
+ offsetSync.upstreamOffset(), offsetSync.downstreamOffset());
}
outstandingOffsetSyncs.release();
});
- return true;
}
private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 9dfcf807ed2..b309df79fd9 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -283,7 +283,11 @@ public class MirrorSourceTaskTest {
});
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+ // We should have dispatched this sync to the producer
+ verify(producer, times(1)).send(any(), any());
+ mirrorSourceTask.commit();
+ // No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(1)).send(any(), any());
recordOffset = 2;
@@ -297,7 +301,11 @@ public class MirrorSourceTaskTest {
doReturn(null).when(producer).send(any(), producerCallback.capture());
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+ // We should have dispatched this sync to the producer
+ verify(producer, times(2)).send(any(), any());
+ mirrorSourceTask.commit();
+ // No more syncs should take place; we've been able to publish all of them so far
verify(producer, times(2)).send(any(), any());
// Do not send sync event
@@ -309,22 +317,35 @@ public class MirrorSourceTaskTest {
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+ mirrorSourceTask.commit();
+ // We should not have dispatched any more syncs to the producer; there were too many already in flight
verify(producer, times(2)).send(any(), any());
+ // Now the in-flight sync has been ack'd
+ producerCallback.getValue().onCompletion(null, null);
+ mirrorSourceTask.commit();
+ // We should dispatch the offset sync that was queued but previously not sent to the producer now
+ verify(producer, times(3)).send(any(), any());
+
+ // Ack the latest sync immediately
+ producerCallback.getValue().onCompletion(null, null);
+
// Should send sync event
- recordOffset = 5;
- metadataOffset = 150;
+ recordOffset = 6;
+ metadataOffset = 106;
recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
recordValue.length, recordKey, recordValue, headers, Optional.empty()));
- producerCallback.getValue().onCompletion(null, null);
-
mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+ // We should have dispatched this sync to the producer
+ verify(producer, times(4)).send(any(), any());
- verify(producer, times(3)).send(any(), any());
+ mirrorSourceTask.commit();
+ // No more syncs should take place; we've been able to publish all of them so far
+ verify(producer, times(4)).send(any(), any());
}
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {