You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/06/06 18:49:24 UTC

[pulsar] branch master updated: [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 49ee8a6bf45 [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
49ee8a6bf45 is described below

commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200

    [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
---
 .../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java   | 6 +++---
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java    | 3 +++
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 445a74b3e3b..31f7cbf6399 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
-        final String topic = sourceRecord.getTopicName().orElse(topicName);
+        final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
         final Object key;
         final Object value;
         final Schema keySchema;
@@ -300,7 +300,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             // keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
             timestamp = sourceRecord.getMessage().get().getPublishTime();
         }
-        return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+        return new SinkRecord(topic,
                 partition,
                 keySchema,
                 key,
@@ -313,7 +313,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
 
     @VisibleForTesting
     protected long currentOffset(String topic, int partition) {
-        return taskContext.currentOffset(topic, partition);
+        return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
     }
 
     // Replace all non-letter, non-digit characters with underscore.
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 1b767e22b52..23d9f1b5ce2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -78,6 +78,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -201,6 +202,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.write(record);
         sink.flush();
 
+        assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) > 0L);
+
         assertEquals(status.get(), 1);
         assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");