You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:09 UTC
[kafka] 01/04: KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 55e69a0db81713f700635f96cce56351abfe8ba3
Author: emilnkrastev <em...@gmail.com>
AuthorDate: Tue Jan 10 16:46:25 2023 +0200
KAFKA-12558: Do not prematurely mutate internal partition state in Mirror Maker 2 (#11818)
Reviewers: Greg Harris <gr...@aiven.io>, Chris Egerton <ch...@aiven.io>
---
.../kafka/connect/mirror/MirrorSourceTask.java | 22 +++-
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 139 ++++++++++++++++++++-
.../MirrorConnectorsIntegrationBaseTest.java | 6 +-
3 files changed, 156 insertions(+), 11 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index da4697ddf25..5635eb7189d 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,7 +69,9 @@ public class MirrorSourceTask extends SourceTask {
// for testing
MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorSourceMetrics metrics, String sourceClusterAlias,
- ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
+ ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer,
+ Semaphore outstandingOffsetSyncs, Map<TopicPartition, PartitionState> partitionStates,
+ String offsetSyncsTopic) {
this.consumer = consumer;
this.metrics = metrics;
this.sourceClusterAlias = sourceClusterAlias;
@@ -77,6 +79,9 @@ public class MirrorSourceTask extends SourceTask {
this.maxOffsetLag = maxOffsetLag;
consumerAccess = new Semaphore(1);
this.offsetProducer = producer;
+ this.outstandingOffsetSyncs = outstandingOffsetSyncs;
+ this.partitionStates = partitionStates;
+ this.offsetSyncsTopic = offsetSyncsTopic;
}
@Override
@@ -198,16 +203,18 @@ public class MirrorSourceTask extends SourceTask {
PartitionState partitionState =
partitionStates.computeIfAbsent(topicPartition, x -> new PartitionState(maxOffsetLag));
if (partitionState.update(upstreamOffset, downstreamOffset)) {
- sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset);
+ if (sendOffsetSync(topicPartition, upstreamOffset, downstreamOffset)) {
+ partitionState.reset();
+ }
}
}
// sends OffsetSync record upstream to internal offsets topic
- private void sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
+ private boolean sendOffsetSync(TopicPartition topicPartition, long upstreamOffset,
long downstreamOffset) {
if (!outstandingOffsetSyncs.tryAcquire()) {
// Too many outstanding offset syncs.
- return;
+ return false;
}
OffsetSync offsetSync = new OffsetSync(topicPartition, upstreamOffset, downstreamOffset);
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(offsetSyncsTopic, 0,
@@ -221,6 +228,7 @@ public class MirrorSourceTask extends SourceTask {
}
outstandingOffsetSyncs.release();
});
+ return true;
}
private Map<TopicPartition, Long> loadOffsets(Set<TopicPartition> topicPartitions) {
@@ -272,6 +280,7 @@ public class MirrorSourceTask extends SourceTask {
long lastSyncUpstreamOffset = -1L;
long lastSyncDownstreamOffset = -1L;
long maxOffsetLag;
+ boolean shouldSyncOffsets;
PartitionState(long maxOffsetLag) {
this.maxOffsetLag = maxOffsetLag;
@@ -279,7 +288,6 @@ public class MirrorSourceTask extends SourceTask {
// true if we should emit an offset sync
boolean update(long upstreamOffset, long downstreamOffset) {
- boolean shouldSyncOffsets = false;
long upstreamStep = upstreamOffset - lastSyncUpstreamOffset;
long downstreamTargetOffset = lastSyncDownstreamOffset + upstreamStep;
if (lastSyncDownstreamOffset == -1L
@@ -294,5 +302,9 @@ public class MirrorSourceTask extends SourceTask {
previousDownstreamOffset = downstreamOffset;
return shouldSyncOffsets;
}
+
+ void reset() {
+ shouldSyncOffsets = false;
+ }
}
}
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index 14cf4143c90..9dfcf807ed2 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,26 +19,37 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.connect.mirror.MirrorSourceTask.PartitionState;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.Semaphore;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verifyNoInteractions;
@@ -56,7 +67,7 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
- new DefaultReplicationPolicy(), 50, producer);
+ new DefaultReplicationPolicy(), 50, producer, null, null, null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
assertEquals("cluster7.topic1", sourceRecord.topic(),
"Failure on cluster7.topic1 consumerRecord serde");
@@ -81,15 +92,33 @@ public class MirrorSourceTaskTest {
MirrorSourceTask.PartitionState partitionState = new MirrorSourceTask.PartitionState(50);
assertTrue(partitionState.update(0, 100), "always emit offset sync on first update");
+ assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+ partitionState.reset();
+ assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
assertTrue(partitionState.update(2, 102), "upstream offset skipped -> resync");
+ partitionState.reset();
assertFalse(partitionState.update(3, 152), "no sync");
+ partitionState.reset();
assertFalse(partitionState.update(4, 153), "no sync");
+ partitionState.reset();
assertFalse(partitionState.update(5, 154), "no sync");
+ partitionState.reset();
assertTrue(partitionState.update(6, 205), "one past target offset");
+ partitionState.reset();
assertTrue(partitionState.update(2, 206), "upstream reset");
+ partitionState.reset();
assertFalse(partitionState.update(3, 207), "no sync");
+ partitionState.reset();
assertTrue(partitionState.update(4, 3), "downstream reset");
+ partitionState.reset();
assertFalse(partitionState.update(5, 4), "no sync");
+ assertTrue(partitionState.update(7, 6), "sync");
+ assertTrue(partitionState.update(7, 6), "sync");
+ assertTrue(partitionState.update(8, 7), "sync");
+ assertTrue(partitionState.update(10, 57), "sync");
+ partitionState.reset();
+ assertFalse(partitionState.update(11, 58), "sync");
+ assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
}
@Test
@@ -98,15 +127,32 @@ public class MirrorSourceTaskTest {
// if max offset lag is zero, should always emit offset syncs
assertTrue(partitionState.update(0, 100), "zeroOffsetSync downStreamOffset 100 is incorrect");
+ assertTrue(partitionState.shouldSyncOffsets, "should sync offsets");
+ partitionState.reset();
+ assertFalse(partitionState.shouldSyncOffsets, "should sync offsets to false");
assertTrue(partitionState.update(2, 102), "zeroOffsetSync downStreamOffset 102 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(3, 153), "zeroOffsetSync downStreamOffset 153 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(4, 154), "zeroOffsetSync downStreamOffset 154 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(5, 155), "zeroOffsetSync downStreamOffset 155 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(6, 207), "zeroOffsetSync downStreamOffset 207 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(2, 208), "zeroOffsetSync downStreamOffset 208 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(3, 209), "zeroOffsetSync downStreamOffset 209 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(4, 3), "zeroOffsetSync downStreamOffset 3 is incorrect");
+ partitionState.reset();
assertTrue(partitionState.update(5, 4), "zeroOffsetSync downStreamOffset 4 is incorrect");
+ assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+ assertTrue(partitionState.update(7, 6), "zeroOffsetSync downStreamOffset 6 is incorrect");
+ assertTrue(partitionState.update(8, 7), "zeroOffsetSync downStreamOffset 7 is incorrect");
+ assertTrue(partitionState.update(10, 57), "zeroOffsetSync downStreamOffset 57 is incorrect");
+ partitionState.reset();
+ assertTrue(partitionState.update(11, 58), "zeroOffsetSync downStreamOffset 58 is incorrect");
}
@Test
@@ -140,7 +186,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
- replicationPolicy, 50, producer);
+ replicationPolicy, 50, producer, null, null, null);
List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
assertEquals(2, sourceRecords.size());
@@ -186,7 +232,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
- replicationPolicy, 50, producer);
+ replicationPolicy, 50, producer, null, null, null);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
@@ -196,6 +242,91 @@ public class MirrorSourceTaskTest {
verifyNoInteractions(producer);
}
+ @Test
+ public void testSendSyncEvent() {
+ byte[] recordKey = "key".getBytes();
+ byte[] recordValue = "value".getBytes();
+ int maxOffsetLag = 50;
+ int recordPartition = 0;
+ int recordOffset = 0;
+ int metadataOffset = 100;
+ String topicName = "topic";
+ String sourceClusterName = "sourceCluster";
+
+ RecordHeaders headers = new RecordHeaders();
+ ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+
+ @SuppressWarnings("unchecked")
+ KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+ @SuppressWarnings("unchecked")
+ KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+ MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
+ Semaphore outstandingOffsetSyncs = new Semaphore(1);
+ PartitionState partitionState = new PartitionState(maxOffsetLag);
+ Map<TopicPartition, PartitionState> partitionStates = new HashMap<>();
+
+ MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+ replicationPolicy, maxOffsetLag, producer, outstandingOffsetSyncs, partitionStates, topicName);
+
+ SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+ recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+ recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+ TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(sourceRecord.sourcePartition());
+ partitionStates.put(sourceTopicPartition, partitionState);
+ RecordMetadata recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+
+ ArgumentCaptor<Callback> producerCallback = ArgumentCaptor.forClass(Callback.class);
+ when(producer.send(any(), producerCallback.capture())).thenAnswer(mockInvocation -> {
+ producerCallback.getValue().onCompletion(null, null);
+ return null;
+ });
+
+ mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+ verify(producer, times(1)).send(any(), any());
+
+ recordOffset = 2;
+ metadataOffset = 102;
+ recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+ sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+ recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+ recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+ // Do not release outstanding sync semaphore
+ doReturn(null).when(producer).send(any(), producerCallback.capture());
+
+ mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+ verify(producer, times(2)).send(any(), any());
+
+ // Do not send sync event
+ recordOffset = 4;
+ metadataOffset = 104;
+ recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+ sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+ recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+ recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+ mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+ verify(producer, times(2)).send(any(), any());
+
+ // Should send sync event
+ recordOffset = 5;
+ metadataOffset = 150;
+ recordMetadata = new RecordMetadata(sourceTopicPartition, metadataOffset, 0, 0, 0, recordPartition);
+ sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, recordPartition,
+ recordOffset, System.currentTimeMillis(), TimestampType.CREATE_TIME, recordKey.length,
+ recordValue.length, recordKey, recordValue, headers, Optional.empty()));
+
+ producerCallback.getValue().onCompletion(null, null);
+
+ mirrorSourceTask.commitRecord(sourceRecord, recordMetadata);
+
+ verify(producer, times(3)).send(any(), any());
+ }
+
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
assertEquals(expectedHeaders.size(), taskHeaders.size());
for (int i = 0; i < expectedHeaders.size(); i++) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
index 9b744cb3e99..5ca9b110707 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java
@@ -305,8 +305,10 @@ public class MirrorConnectorsIntegrationBaseTest {
Map<TopicPartition, OffsetAndMetadata> backupOffsets = backupClient.remoteConsumerOffsets(consumerGroupName, PRIMARY_CLUSTER_ALIAS,
Duration.ofMillis(CHECKPOINT_DURATION_MS));
- assertTrue(backupOffsets.containsKey(
- new TopicPartition("primary.test-topic-1", 0)), "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
+ assertTrue(backupOffsets.containsKey(new TopicPartition("primary.test-topic-1", i)),
+ "Offsets not translated downstream to backup cluster. Found: " + backupOffsets);
+ }
// Failover consumer group to backup cluster.
try (Consumer<byte[], byte[]> primaryConsumer = backup.kafka().createConsumer(Collections.singletonMap("group.id", consumerGroupName))) {