You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/02/23 13:19:11 UTC
[kafka] 03/04: KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 300779dee431a95471031a7aa27ffa72a4f89fba
Author: csolidum <ch...@gmail.com>
AuthorDate: Wed Jan 4 03:02:52 2023 -0800
KAFKA-14545: Make MirrorCheckpointTask.checkpoint handle null OffsetAndMetadata gracefully (#13052)
Reviewers: Mickael Maison <mi...@gmail.com>, Greg Harris <gh...@gmail.com>
---
.../apache/kafka/connect/mirror/MirrorCheckpointTask.java | 14 ++++++++------
.../kafka/connect/mirror/MirrorCheckpointTaskTest.java | 10 ++++++++++
2 files changed, 18 insertions(+), 6 deletions(-)
diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
index 36f8adab5f8..95b7b5a2bda 100644
--- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
+++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
@@ -189,14 +189,16 @@ public class MirrorCheckpointTask extends SourceTask {
Optional<Checkpoint> checkpoint(String group, TopicPartition topicPartition,
OffsetAndMetadata offsetAndMetadata) {
- long upstreamOffset = offsetAndMetadata.offset();
- OptionalLong downstreamOffset = offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
- if (downstreamOffset.isPresent()) {
- return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
+ if (offsetAndMetadata != null) {
+ long upstreamOffset = offsetAndMetadata.offset();
+ OptionalLong downstreamOffset =
+ offsetSyncStore.translateDownstream(topicPartition, upstreamOffset);
+ if (downstreamOffset.isPresent()) {
+ return Optional.of(new Checkpoint(group, renameTopicPartition(topicPartition),
upstreamOffset, downstreamOffset.getAsLong(), offsetAndMetadata.metadata()));
- } else {
- return Optional.empty();
+ }
}
+ return Optional.empty();
}
SourceRecord checkpointRecord(Checkpoint checkpoint, long timestamp) {
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
index 54fe678e73a..20735cd2334 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorCheckpointTaskTest.java
@@ -161,4 +161,14 @@ public class MirrorCheckpointTaskTest {
assertFalse(checkpoint1.isPresent());
assertTrue(checkpoint2.isPresent());
}
+
+ @Test
+ public void testNoCheckpointForTopicWithNullOffsetAndMetadata() {
+ OffsetSyncStoreTest.FakeOffsetSyncStore offsetSyncStore = new OffsetSyncStoreTest.FakeOffsetSyncStore();
+ MirrorCheckpointTask mirrorCheckpointTask = new MirrorCheckpointTask("source1", "target2",
+ new DefaultReplicationPolicy(), offsetSyncStore, Collections.emptyMap(), Collections.emptyMap());
+ offsetSyncStore.sync(new TopicPartition("topic1", 0), 1L, 3L);
+ Optional<Checkpoint> checkpoint = mirrorCheckpointTask.checkpoint("g1", new TopicPartition("topic1", 0), null);
+ assertFalse(checkpoint.isPresent());
+ }
}