You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:46 UTC

[pulsar] 16/17: [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1a7a157df8971618c42d2e83ddab1c5c2c81c179
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200

    [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
    
    (cherry picked from commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290)
---
 .../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java   | 6 +++---
 .../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java    | 3 +++
 2 files changed, 6 insertions(+), 3 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 232d8c092ac..502154065d9 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
         final int partition = sourceRecord.getPartitionIndex().orElse(0);
-        final String topic = sourceRecord.getTopicName().orElse(topicName);
+        final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
         final Object key;
         final Object value;
         final Schema keySchema;
@@ -290,7 +290,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
             // keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
             timestamp = sourceRecord.getMessage().get().getPublishTime();
         }
-        return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+        return new SinkRecord(topic,
                 partition,
                 keySchema,
                 key,
@@ -303,7 +303,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
 
     @VisibleForTesting
     protected long currentOffset(String topic, int partition) {
-        return taskContext.currentOffset(topic, partition);
+        return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
     }
 
     // Replace all non-letter, non-digit characters with underscore.
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 8d08ebcef87..1fba098a228 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -74,6 +74,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
@@ -197,6 +198,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase  {
         sink.write(record);
         sink.flush();
 
+        assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) > 0L);
+
         assertEquals(status.get(), 1);
         assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");