You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/09/15 13:56:12 UTC

[kafka] branch trunk updated: KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b09cadcaa7 KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)
b09cadcaa7 is described below

commit b09cadcaa7fd0eaea32ce1fe2e22fefcbf1079ee
Author: Rens Groothuijsen <l....@alumni.maastrichtuniversity.nl>
AuthorDate: Thu Sep 15 15:55:44 2022 +0200

    KAFKA-13985: Skip committing MirrorSourceTask records without metadata (#12602)
    
    Reviewers: Chris Egerton <ch...@aiven.io>
---
 .../kafka/connect/mirror/MirrorSourceTask.java     | 39 +++++++++++----------
 .../kafka/connect/mirror/MirrorSourceTaskTest.java | 40 ++++++++++++++++++++--
 2 files changed, 58 insertions(+), 21 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
index ec1e15cda7..d3f884a55a 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceTask.java
@@ -69,13 +69,14 @@ public class MirrorSourceTask extends SourceTask {
 
     // for testing
     MirrorSourceTask(KafkaConsumer<byte[], byte[]> consumer, MirrorMetrics metrics, String sourceClusterAlias,
-                     ReplicationPolicy replicationPolicy, long maxOffsetLag) {
+                     ReplicationPolicy replicationPolicy, long maxOffsetLag, KafkaProducer<byte[], byte[]> producer) {
         this.consumer = consumer;
         this.metrics = metrics;
         this.sourceClusterAlias = sourceClusterAlias;
         this.replicationPolicy = replicationPolicy;
         this.maxOffsetLag = maxOffsetLag;
         consumerAccess = new Semaphore(1);
+        this.offsetProducer = producer;
     }
 
     @Override
@@ -170,25 +171,25 @@ public class MirrorSourceTask extends SourceTask {
  
     @Override
     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
-        try {
-            if (stopping) {
-                return;
-            }
-            if (!metadata.hasOffset()) {
-                log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
-                return;
-            }
-            TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
-            long latency = System.currentTimeMillis() - record.timestamp();
-            metrics.countRecord(topicPartition);
-            metrics.replicationLatency(topicPartition, latency);
-            TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
-            long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
-            long downstreamOffset = metadata.offset();
-            maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
-        } catch (Throwable e) {
-            log.warn("Failure committing record.", e);
+        if (stopping) {
+            return;
+        }
+        if (metadata == null) {
+            log.debug("No RecordMetadata (source record was probably filtered out during transformation) -- can't sync offsets for {}.", record.topic());
+            return;
+        }
+        if (!metadata.hasOffset()) {
+            log.error("RecordMetadata has no offset -- can't sync offsets for {}.", record.topic());
+            return;
         }
+        TopicPartition topicPartition = new TopicPartition(record.topic(), record.kafkaPartition());
+        long latency = System.currentTimeMillis() - record.timestamp();
+        metrics.countRecord(topicPartition);
+        metrics.replicationLatency(topicPartition, latency);
+        TopicPartition sourceTopicPartition = MirrorUtils.unwrapPartition(record.sourcePartition());
+        long upstreamOffset = MirrorUtils.unwrapOffset(record.sourceOffset());
+        long downstreamOffset = metadata.offset();
+        maybeSyncOffsets(sourceTopicPartition, upstreamOffset, downstreamOffset);
     }
 
     // updates partition state and sends OffsetSync if necessary
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
index feb2f7fb6b..4175013b41 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.mirror;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
 import org.apache.kafka.common.record.TimestampType;
@@ -39,6 +40,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verifyNoInteractions;
 
 public class MirrorSourceTaskTest {
 
@@ -51,8 +53,10 @@ public class MirrorSourceTaskTest {
         headers.add("header2", new byte[]{'p', 'q', 'r', 's', 't'});
         ConsumerRecord<byte[], byte[]> consumerRecord = new ConsumerRecord<>("topic1", 2, 3L, 4L,
             TimestampType.CREATE_TIME, 5, 6, key, value, headers, Optional.empty());
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(null, null, "cluster7",
-                new DefaultReplicationPolicy(), 50);
+                new DefaultReplicationPolicy(), 50, producer);
         SourceRecord sourceRecord = mirrorSourceTask.convertRecord(consumerRecord);
         assertEquals("cluster7.topic1", sourceRecord.topic(),
                 "Failure on cluster7.topic1 consumerRecord serde");
@@ -127,6 +131,8 @@ public class MirrorSourceTaskTest {
 
         @SuppressWarnings("unchecked")
         KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
         when(consumer.poll(any())).thenReturn(consumerRecords);
 
         MirrorMetrics metrics = mock(MirrorMetrics.class);
@@ -134,7 +140,7 @@ public class MirrorSourceTaskTest {
         String sourceClusterName = "cluster1";
         ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
         MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
-                replicationPolicy, 50);
+                replicationPolicy, 50, producer);
         List<SourceRecord> sourceRecords = mirrorSourceTask.poll();
 
         assertEquals(2, sourceRecords.size());
@@ -160,6 +166,36 @@ public class MirrorSourceTaskTest {
         }
     }
 
+    @Test
+    public void testCommitRecordWithNullMetadata() {
+        // Create a consumer mock
+        byte[] key1 = "abc".getBytes();
+        byte[] value1 = "fgh".getBytes();
+        String topicName = "test";
+        String headerKey = "key";
+        RecordHeaders headers = new RecordHeaders(new Header[] {
+            new RecordHeader(headerKey, "value".getBytes()),
+        });
+
+        @SuppressWarnings("unchecked")
+        KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
+        @SuppressWarnings("unchecked")
+        KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
+        MirrorMetrics metrics = mock(MirrorMetrics.class);
+
+        String sourceClusterName = "cluster1";
+        ReplicationPolicy replicationPolicy = new DefaultReplicationPolicy();
+        MirrorSourceTask mirrorSourceTask = new MirrorSourceTask(consumer, metrics, sourceClusterName,
+                replicationPolicy, 50, producer);
+
+        SourceRecord sourceRecord = mirrorSourceTask.convertRecord(new ConsumerRecord<>(topicName, 0, 0, System.currentTimeMillis(),
+                TimestampType.CREATE_TIME, key1.length, value1.length, key1, value1, headers, Optional.empty()));
+
+        // Expect that commitRecord will not throw an exception
+        mirrorSourceTask.commitRecord(sourceRecord, null);
+        verifyNoInteractions(producer);
+    }
+
     private void compareHeaders(List<Header> expectedHeaders, List<org.apache.kafka.connect.header.Header> taskHeaders) {
         assertEquals(expectedHeaders.size(), taskHeaders.size());
         for (int i = 0; i < expectedHeaders.size(); i++) {