You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/09/15 13:56:12 UTC
[kafka] branch trunk updated: KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b09cadcaa7 KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)
b09cadcaa7 is described below
commit b09cadcaa7fd0eaea32ce1fe2e22fefcbf1079ee
Author: Rens Groothuijsen <l....@alumni.maastrichtuniversity.nl>
AuthorDate: Thu Sep 15 15:55:44 2022 +0200
KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)
Reviewers: Chris Egerton <ch...@aiven.io>
---
.../kafka/connect/mirror/MirrorSourceTask.java | 39 +++++++++++----------
.../kafka/connect/mirror/MirrorSourceTaskTest.java | 40 ++++++++++++++++++++--
2 files changed, 58 insertions(+), 21 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index ec1e15cda7..d3f884a55a 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,13 +69,14 @@ public class MirrorSourceTask extends SourceTask {
// for testing
MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorMetrics metrics, String sourceClusterAlias,
- ReplicationPolicy replicationPolicy, long maxOffsetLag) {
+ ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
this.consumer = consumer;
this.metrics = metrics;
this.sourceClusterAlias = sourceClusterAlias;
this.replicationPolicy = replicationPolicy;
this.maxOffsetLag = maxOffsetLag;
consumerAccess = new Semaphore(1);
+ this.offsetProducer = producer;
}
@Override
@@ -170,25 +171,25 @@ public class MirrorSourceTask extends SourceTask {
@Override
public void commitRecord(SourceRecord record, RecordMetadata metadata) {
- try {
- if (stopping) {
- return;
- }
- if (!metadata.hasOffset()) {
- log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
- return;
- }
- TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
- long latency = System.currentTimeMillis() - record.timestamp();
- metrics.countRecord(topicPartition);
- metrics.replicationLatency(topicPartition, latency);
- TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
- long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
- long downstreamOffset = metadata.offset();
- maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
- } catch (Throwable e) {
- log.warn("Failure committing record.", e);
+ if (stopping) {
+ return;
+ }
+ if (metadata == null) {
+ log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", record.topic());
+ return;
+ }
+ if (!metadata.hasOffset()) {
+ log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
+ return;
}
+ TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
+ long latency = System.currentTimeMillis() - record.timestamp();
+ metrics.countRecord(topicPartition);
+ metrics.replicationLatency(topicPartition, latency);
+ TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
+ long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
+ long downstreamOffset = metadata.offset();
+ maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
}
// updates partition state and sends OffsetSync if necessary
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index feb2f7fb6b..4175013b41 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.record.TimestampType;
@@ -39,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verifyNoInteractions;
public class MirrorSourceTaskTest {
@@ -51,8 +53,10 @@ public class MirrorSourceTaskTest {
headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
+ @SuppressWarnings("unchecked")
+ KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
- new DefaultReplicationPolicy(), 50);
+ new DefaultReplicationPolicy(), 50, producer);
SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
assertEquals("cluster7.topic1", sourceRecord.topic(),
"Failure on cluster7.topic1 consumerRecord serde");
@@ -127,6 +131,8 @@ public class MirrorSourceTaskTest {
@SuppressWarnings("unchecked")
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+ @SuppressWarnings("unchecked")
+ KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
when(consumer.poll(any())).thenReturn(consumerRecords);
MirrorMetrics metrics = mock(MirrorMetrics.class);
@@ -134,7 +140,7 @@ public class MirrorSourceTaskTest {
String sourceClusterName = "cluster1";
ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
- replicationPolicy, 50);
+ replicationPolicy, 50, producer);
List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
assertEquals(2, sourceRecords.size());
@@ -160,6 +166,36 @@ public class MirrorSourceTaskTest {
}
}
+ @Test
+ public void testCommitRecordWithNullMetadata() {
+ // Create a consumer mock
+ byte[] key1 = "abc".getBytes();
+ byte[] value1 = "fgh".getBytes();
+ String topicName = "test";
+ String headerKey = "key";
+ RecordHeaders headers = new RecordHeaders(new Header[] {
+ new RecordHeader(headerKey, "value".getBytes()),
+ });
+
+ @SuppressWarnings("unchecked")
+ KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+ @SuppressWarnings("unchecked")
+ KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+ MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+ String sourceClusterName = "cluster1";
+ ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+ MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+ replicationPolicy, 50, producer);
+
+ SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
+ TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
+
+ // Expect that commitRecord will not throw an exception
+ mirrorSourceTask.commitRecord(sourceRecord, null);
+ verifyNoInteractions(producer);
+ }
+
private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
assertEquals(expectedHeaders.size(), taskHeaders.size());
for (int i = 0; i < expectedHeaders.size(); i++) {