You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:46 UTC
[pulsar] 16/17: [fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1a7a157df8971618c42d2e83ddab1c5c2c81c179
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Mon Jun 6 20:49:17 2022 +0200
[fix][connector] KCA connectors: fix offset mapping when sanitizeTopicName=true (#15950)
(cherry picked from commit 49ee8a6bf4571d39adf0e942fc6bb04d9daa1290)
---
.../java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java | 6 +++---
.../org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java | 3 +++
2 files changed, 6 insertions(+), 3 deletions(-)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index 232d8c092ac..502154065d9 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -243,7 +243,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
@SuppressWarnings("rawtypes")
protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
final int partition = sourceRecord.getPartitionIndex().orElse(0);
- final String topic = sourceRecord.getTopicName().orElse(topicName);
+ final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
final Object key;
final Object value;
final Schema keySchema;
@@ -290,7 +290,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
// keep timestampType = TimestampType.NO_TIMESTAMP_TYPE
timestamp = sourceRecord.getMessage().get().getPublishTime();
}
- return new SinkRecord(sanitizeNameIfNeeded(topic, sanitizeTopicName),
+ return new SinkRecord(topic,
partition,
keySchema,
key,
@@ -303,7 +303,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
@VisibleForTesting
protected long currentOffset(String topic, int partition) {
- return taskContext.currentOffset(topic, partition);
+ return taskContext.currentOffset(sanitizeNameIfNeeded(topic, sanitizeTopicName), partition);
}
// Replace all non-letter, non-digit characters with underscore.
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 8d08ebcef87..1fba098a228 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -74,6 +74,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -197,6 +198,8 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
sink.write(record);
sink.flush();
+ assertTrue(sink.currentOffset("persistent://a-b/c-d/fake-topic.a", 0) > 0L);
+
assertEquals(status.get(), 1);
assertEquals(resultCaptor.getResult().topic(), "persistent___a_b_c_d_fake_topic_a");