You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ay...@apache.org on 2023/03/29 18:34:19 UTC

[pulsar] branch branch-2.11 updated: [improve][io] KCA: option to collapse partitioned topics (#19923)

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new b2f734f4594 [improve][io] KCA: option to collapse partitioned topics (#19923)
b2f734f4594 is described below

commit b2f734f45949131c9e5a4c36a57db026f3bafac7
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Tue Mar 28 23:54:05 2023 -0700

    [improve][io] KCA: option to collapse partitioned topics (#19923)
    
    (cherry picked from commit 7cb48fd9d41f8606d86a0503b0130aec6b9d00d5)
---
 pulsar-io/kafka-connect-adaptor/pom.xml            |  1 +
 .../pulsar/io/kafka/connect/KafkaConnectSink.java  | 20 ++++-
 .../connect/PulsarKafkaConnectSinkConfig.java      |  5 ++
 .../io/kafka/connect/KafkaConnectSinkTest.java     | 88 ++++++++++++++++++++++
 4 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml
index 24cf0543545..5e03308960f 100644
--- a/pulsar-io/kafka-connect-adaptor/pom.xml
+++ b/pulsar-io/kafka-connect-adaptor/pom.xml
@@ -42,6 +42,7 @@
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-io-common</artifactId>
       <version>${project.version}</version>
+      <scope>compile</scope>
     </dependency>
 
     <dependency>
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
index bbf6bdae4e7..0f2321eaaa4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java
@@ -55,6 +55,7 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.schema.KeyValue;
 import org.apache.pulsar.common.schema.SchemaType;
 import org.apache.pulsar.functions.api.Record;
@@ -92,6 +93,9 @@ public class KafkaConnectSink implements Sink<GenericObject> {
     protected String topicName;
 
     private boolean sanitizeTopicName = false;
+    // Thi is a workaround for https://github.com/apache/pulsar/issues/19922
+    private boolean collapsePartitionedTopics = false;
+
     private final Cache<String, String> sanitizedTopicCache =
             CacheBuilder.newBuilder().maximumSize(1000)
                     .expireAfterAccess(30, TimeUnit.MINUTES).build();
@@ -160,6 +164,7 @@ public class KafkaConnectSink implements Sink<GenericObject> {
         topicName = kafkaSinkConfig.getTopic();
         unwrapKeyValueIfAvailable = kafkaSinkConfig.isUnwrapKeyValueIfAvailable();
         sanitizeTopicName = kafkaSinkConfig.isSanitizeTopicName();
+        collapsePartitionedTopics = kafkaSinkConfig.isCollapsePartitionedTopics();
 
         useIndexAsOffset = kafkaSinkConfig.isUseIndexAsOffset();
         maxBatchBitsForOffset = kafkaSinkConfig.getMaxBatchBitsForOffset();
@@ -418,8 +423,19 @@ public class KafkaConnectSink implements Sink<GenericObject> {
 
     @SuppressWarnings("rawtypes")
     protected SinkRecord toSinkRecord(Record<GenericObject> sourceRecord) {
-        final int partition = sourceRecord.getPartitionIndex().orElse(0);
-        final String topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+        final int partition;
+        final String topic;
+
+        if (collapsePartitionedTopics
+                && sourceRecord.getTopicName().isPresent()
+                && TopicName.get(sourceRecord.getTopicName().get()).isPartitioned()) {
+            TopicName tn = TopicName.get(sourceRecord.getTopicName().get());
+            partition = tn.getPartitionIndex();
+            topic = sanitizeNameIfNeeded(tn.getPartitionedTopicName(), sanitizeTopicName);
+        } else {
+            partition = sourceRecord.getPartitionIndex().orElse(0);
+            topic = sanitizeNameIfNeeded(sourceRecord.getTopicName().orElse(topicName), sanitizeTopicName);
+        }
         final Object key;
         final Object value;
         final Schema keySchema;
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
index a6e8b5c966e..f15704de4d0 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaConnectSinkConfig.java
@@ -95,6 +95,11 @@ public class PulsarKafkaConnectSinkConfig implements Serializable {
                     + "In some cases it may result in topic name collisions (topic_a and topic.a will become the same)")
     private boolean sanitizeTopicName = false;
 
+    @FieldDoc(
+            defaultValue = "false",
+            help = "Supply kafka record with topic name without -partition- suffix for partitioned topics.")
+    private boolean collapsePartitionedTopics = false;
+
     public static PulsarKafkaConnectSinkConfig load(String yamlFile) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(yamlFile), PulsarKafkaConnectSinkConfig.class);
diff --git a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
index 9d7cb030271..775b8f2bee2 100644
--- a/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
+++ b/pulsar-io/kafka-connect-adaptor/src/test/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSinkTest.java
@@ -1567,6 +1567,94 @@ public class KafkaConnectSinkTest extends ProducerConsumerBase {
         assertEquals(ref.getBatchIdx(), batchIdx);
     }
 
+    @Test
+    public void collapsePartitionedTopicEnabledTest() throws Exception {
+        testCollapsePartitionedTopic(true,
+                "persistent://a/b/fake-topic-partition-0",
+                "persistent://a/b/fake-topic",
+                0);
+
+        testCollapsePartitionedTopic(true,
+                "persistent://a/b/fake-topic-partition-1",
+                "persistent://a/b/fake-topic",
+                1);
+
+        testCollapsePartitionedTopic(true,
+                "persistent://a/b/fake-topic",
+                "persistent://a/b/fake-topic",
+                0);
+
+        testCollapsePartitionedTopic(true,
+                "fake-topic-partition-5",
+                "persistent://public/default/fake-topic",
+                5);
+    }
+
+    @Test
+    public void collapsePartitionedTopicDisabledTest() throws Exception {
+        testCollapsePartitionedTopic(false,
+                "persistent://a/b/fake-topic-partition-0",
+                "persistent://a/b/fake-topic-partition-0",
+                0);
+
+        testCollapsePartitionedTopic(false,
+                "persistent://a/b/fake-topic-partition-1",
+                "persistent://a/b/fake-topic-partition-1",
+                0);
+
+        testCollapsePartitionedTopic(false,
+                "persistent://a/b/fake-topic",
+                "persistent://a/b/fake-topic",
+                0);
+
+        testCollapsePartitionedTopic(false,
+                "fake-topic-partition-5",
+                "fake-topic-partition-5",
+                0);
+    }
+
+    private void testCollapsePartitionedTopic(boolean isEnabled,
+                                              String pulsarTopic,
+                                              String expectedKafkaTopic,
+                                              int expectedPartition) throws Exception {
+        props.put("kafkaConnectorSinkClass", SchemaedFileStreamSinkConnector.class.getCanonicalName());
+        props.put("collapsePartitionedTopics", Boolean.toString(isEnabled));
+
+        KafkaConnectSink sink = new KafkaConnectSink();
+            sink.open(props, context);
+
+        AvroSchema<PulsarSchemaToKafkaSchemaTest.StructWithAnnotations> pulsarAvroSchema
+                = AvroSchema.of(PulsarSchemaToKafkaSchemaTest.StructWithAnnotations.class);
+
+        final GenericData.Record obj = new GenericData.Record(pulsarAvroSchema.getAvroSchema());
+        obj.put("field1", (byte) 10);
+        obj.put("field2", "test");
+        obj.put("field3", (short) 100);
+
+        final GenericRecord rec = getGenericRecord(obj, pulsarAvroSchema);
+        Message msg = mock(MessageImpl.class);
+        when(msg.getValue()).thenReturn(rec);
+        when(msg.getKey()).thenReturn("key");
+        when(msg.hasKey()).thenReturn(true);
+        when(msg.getMessageId()).thenReturn(new MessageIdImpl(1, 0, 0));
+
+        final AtomicInteger status = new AtomicInteger(0);
+        Record<GenericObject> record = PulsarRecord.<String>builder()
+                .topicName(pulsarTopic)
+                .message(msg)
+                .schema(pulsarAvroSchema)
+                .ackFunction(status::incrementAndGet)
+                .failFunction(status::decrementAndGet)
+                .build();
+
+        SinkRecord sinkRecord = sink.toSinkRecord(record);
+
+        Assert.assertEquals(sinkRecord.topic(), expectedKafkaTopic);
+        Assert.assertEquals((int)sinkRecord.kafkaPartition(), expectedPartition);
+
+        sink.close();
+    }
+
     @SneakyThrows
     private java.util.Date getDateFromString(String dateInString) {
         SimpleDateFormat formatter = new SimpleDateFormat("dd/MM/yyyy hh:mm:ss");