You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:11 UTC

[kafka] 03/04: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 300779dee431a95471031a7aa27ffa72a4f89fba
Author: csolidum <ch...@gmail.com>
AuthorDate: Wed Jan 4 03:02:52 2023 -0800

    KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
 .../apache/kafka/connect/mirror/MirrorCheckpointTask.java  | 14 ++++++++------
 .../kafka/connect/mirror/MirrorCheckpointTaskTest.java     | 10 ++++++++++
 2 files changed, 18 insertions(+), 6 deletions(-)

diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 36f8adab5f8..95b7b5a2bda 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -189,14 +189,16 @@ public class MirrorCheckpointTask extends SourceTask {
 
     Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition,
                                     OffsetAndMetadata offsetAndMetadata) {
-        long upstreamOffset = offsetAndMetadata.offset();
-        OptionalLong downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
-        if (downstreamOffset.isPresent()) {
-            return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
+        if (offsetAndMetadata != null) {
+            long upstreamOffset = offsetAndMetadata.offset();
+            OptionalLong downstreamOffset =
+                offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+            if (downstreamOffset.isPresent()) {
+                return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
                     upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
-        } else {
-            return Optional.empty();
+            }
         }
+        return Optional.empty();
     }
 
     SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 54fe678e73a..20735cd2334 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest {
         assertFalse(checkpoint1.isPresent());
         assertTrue(checkpoint2.isPresent());
     }
+
+    @Test
+    public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
+        OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+        MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+            new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
+        offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
+        Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
+        assertFalse(checkpoint.isPresent());
+    }
 }